From eb3a902b56cca570c31e042c502e1aa6ebd03b9f Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 4 Jul 2025 22:05:53 +0800 Subject: [PATCH 1/6] 1 --- be/src/agent/agent_server.cpp | 8 +- be/src/agent/agent_server.h | 2 + be/src/agent/task_worker_pool.cpp | 135 ++++++++++++++++++ be/src/agent/task_worker_pool.h | 2 + be/src/cloud/config.cpp | 4 + be/src/cloud/config.h | 4 + be/src/olap/cumulative_compaction.cpp | 17 +++ be/src/olap/olap_server.cpp | 1 + be/src/olap/storage_engine.cpp | 23 +++ be/src/olap/storage_engine.h | 7 + be/src/olap/task/engine_clone_task.cpp | 51 +++++++ be/src/olap/task/engine_clone_task.h | 3 + .../olap/task/engine_publish_version_task.cpp | 19 +++ be/src/util/blocking_priority_queue.hpp | 1 + 14 files changed, 275 insertions(+), 2 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 55b4e8990563d7..38e4d6a59cbc67 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -42,6 +42,7 @@ #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" +#include "util/work_thread_pool.hpp" namespace doris { @@ -168,8 +169,8 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { _workers[TTaskType::ALTER] = std::make_unique( "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); }); - _workers[TTaskType::CLONE] = std::make_unique( - "CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); + _workers[TTaskType::CLONE] = std::make_unique( + "CLONE", config::clone_worker_count,config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique( "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); }); @@ -198,6 +199,9 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { _report_workers.push_back(std::make_unique( "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); })); // clang-format on + + exec_env->storage_engine().to_local().missing_rowset_thread_pool = + static_cast(_workers[TTaskType::CLONE].get()); } void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) { diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index e5b5b522ba0223..01333f2881cf85 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -24,6 +24,8 @@ #include #include +#include "util/work_thread_pool.hpp" + namespace doris { class TaskWorkerPoolIf; diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5bd18d62ca6856..81d9a4161c794c 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -17,6 +17,7 @@ #include "agent/task_worker_pool.h" +#include #include #include #include @@ -85,6 +86,7 @@ #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/snapshot_loader.h" #include "service/backend_options.h" +#include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/jni-util.h" @@ -609,6 +611,51 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { }); } +Status PriorTaskWorkerPool::submit_high_prior_task(const TAgentTaskRequest& task) { + const TTaskType::type task_type = task.task_type; + int64_t signature = task.signature; + std::string type_str; + EnumToString(TTaskType, task_type, type_str); + auto req = std::make_unique(task); + + DCHECK(req->__isset.priority && req->priority == TPriority::HIGH); + do { + std::lock_guard lock(s_task_signatures_mtx); + auto& set = s_task_signatures[task_type]; + if (!set.contains(signature)) { + // 如果不存在,直接放到优先队列 + set.insert(signature); + std::lock_guard lock(_mtx); + _high_prior_queue.push_back(std::move(req)); + _high_prior_condv.notify_one(); + _normal_condv.notify_one(); + break; + } else { + std::lock_guard lock(_mtx); + for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { + // 如果存在普通队列,将普通队列中的task move到优先队列 + if ((*it)->signature == signature) { + _high_prior_queue.push_back(std::move(*it)); // 复制到目标 + it = _normal_queue.erase(it); // 从源中删除,erase返回下一个有效迭代器 + _high_prior_condv.notify_one(); + _normal_condv.notify_one(); + break; + } else { + ++it; // 不满足条件,继续下一个 + } + } + // 如果存在高优队列,不需要任何操作 + LOG(INFO) << "exit already."; + } + } while (true); + + // Set the receiving time of task so that we can determine whether it is timed out later + (const_cast(task)).__set_recv_time(time(nullptr)); + + LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); + return Status::OK(); +} + void PriorTaskWorkerPool::normal_loop() { while (true) { std::unique_ptr req; @@ -2052,8 +2099,96 @@ void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& re visible_version_req.partition_version); } +Status get_rowset_verisons_from_peer(const TReplicaInfo& addr, std::vector& peer_versions, + int64_t tablet_id) { + PGetTabletVersionsRequest request; + request.set_tablet_id(tablet_id); + PGetTabletVersionsResponse response; + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host, + addr.brpc_port); + if (stub == nullptr) { + return Status::Aborted("get rpc stub failed, host={}, port={}", addr.host, addr.brpc_port); + } + + brpc::Controller cntl; + stub->get_tablet_rowset_versions(&cntl, &request, &response, nullptr); + if (cntl.Failed()) { + return Status::Aborted("open brpc connection failed"); + } + if (response.status().status_code() != 0) { + return Status::Aborted("peer don't have tablet"); + } + if (response.versions_size() == 0) { + return Status::Aborted("no peer version"); + } + for (int i = 0; i < response.versions_size(); ++i) { + peer_versions.emplace_back(response.versions(i).first(), response.versions(i).second()); + } + return Status::OK(); +} + +bool find_be_to_fetch(const std::unordered_map>& peers_versions, + TReplicaInfo* addr) { + return true; +} + +void clone_missing_rowset(StorageEngine& engine, const ClusterInfo* cluster_info, + const TAgentTaskRequest& req) { + std::vector missing_versions; + int64_t tablet_id = req.missing_rowset_req.tablet_id; + Version missing_version(req.missing_rowset_req.missing_rowset_start_version, + req.missing_rowset_req.missing_rowset_end_version); + missing_versions.emplace_back(missing_version); + std::vector addrs; + std::string token; + // 1. 先拿到所有的replica信息,例如一共3副本,这里的addrs会拿到其他2个副本的信息 + if (!engine.get_peers_replicas_info(tablet_id, &addrs, &token)) { + LOG(WARNING) << tablet_id << " tablet don't have peer replica"; + return; + } + + // 2. 拿到上述所有副本的rowset版本分布信息 + std::unordered_map> peers_versions; + for (const auto& addr : addrs) { + std::vector peer_versions; + auto st = get_rowset_verisons_from_peer(addr, peer_versions, tablet_id); + if (!st.ok()) { + LOG_WARNING("failed to get rowset version from peer"); + return; + } + peers_versions.insert({addr.replica_id, peer_versions}); + } + TReplicaInfo addr; + // 3. 遍历每个副本,寻找能匹配的be或者没有缺版本的be + if (!find_be_to_fetch(peers_versions, &addr)) { + LOG_WARNING("failed to find be to fetch"); + } + + // 4. fetch compaction result + auto tablet = engine.tablet_manager()->get_tablet(tablet_id); + std::vector tablet_infos; + TCloneReq clone_req; + clone_req.__set_tablet_id(tablet_id); + EngineCloneTask task(engine, clone_req, cluster_info, req.signature, &tablet_infos); + auto st = task.clone_missing_rowset_from_peer(addr, token, missing_versions, tablet.get()); + if (!st.ok()) { + LOG_WARNING("failed to get rowset version from peer"); + return; + } +} + void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, const TAgentTaskRequest& req) { + DCHECK(req.__isset.clone_req || req.__isset.missing_rowset_req); + + if (req.__isset.missing_rowset_req) { + DorisMetrics::instance()->clone_requests_total->increment(1); + LOG(INFO) << "get missing rowset clone task. signature=" << req.signature; + clone_missing_rowset(engine, cluster_info, req); + return; + } + const auto& clone_req = req.clone_req; DorisMetrics::instance()->clone_requests_total->increment(1); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 8d9be32b3dc017..8d992a06da2e7a 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -89,6 +89,8 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf { Status submit_task(const TAgentTaskRequest& task) override; + Status submit_high_prior_task(const TAgentTaskRequest& task); + private: void normal_loop(); diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 0f78e885881bbd..859b49d05c9c00 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -91,5 +91,9 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); DEFINE_Bool(enable_check_storage_vault, "true"); +DEFINE_mBool(enable_compaction_clone_missing_rowset, "true"); + +DEFINE_mBool(enable_mow_publish_clone_missing_rowset, "true"); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 4ac3bb31c61a81..748d81aa5feb55 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -127,5 +127,9 @@ DECLARE_mInt32(meta_service_conflict_error_retry_times); DECLARE_Bool(enable_check_storage_vault); +DECLARE_mBool(enable_compaction_clone_missing_rowset); + +DECLARE_mBool(enable_mow_publish_clone_missing_rowset); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index a0746997128e46..021cdc47657788 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -22,14 +22,18 @@ #include #include #include +#include +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" #include "olap/rowset/rowset_meta.h" +#include "olap/storage_engine.h" #include "olap/tablet.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/doris_metrics.h" #include "util/time.h" @@ -187,6 +191,19 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", first missed version prev rowset verison=" << missing_versions[0] << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); + if (config::enable_compaction_clone_missing_rowset) { + TAgentTaskRequest task; + TMissingRowsetReq req; + req.__set_tablet_id(0); + req.__set_missing_rowset_start_version(missing_versions[0].second + 1); + req.__set_missing_rowset_end_version(missing_versions[1].first - 1); + task.__set_task_type(TTaskType::CLONE); + task.__set_missing_rowset_req(req); + task.__set_priority(TPriority::HIGH); + PriorTaskWorkerPool* thread_pool = + ExecEnv::GetInstance()->storage_engine().to_local().missing_rowset_thread_pool; + RETURN_IF_ERROR(thread_pool->submit_high_prior_task(task)); + } } int64_t max_score = config::cumulative_compaction_max_deltas; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index a709ad1467cda6..28467513749c6e 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -779,6 +779,7 @@ void StorageEngine::_update_replica_infos_callback() { } std::unique_lock lock(_peer_replica_infos_mutex); + _tablet_replica_infos = result.tablet_replica_infos; for (const auto& it : result.tablet_replica_infos) { auto tablet_id = it.first; auto tablet = _tablet_manager->get_tablet(tablet_id); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index dcb296cce82db2..7971625d7c7c98 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -20,6 +20,7 @@ // IWYU pragma: no_include #include #include +#include #include #include #include @@ -1500,6 +1501,28 @@ bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* repli return false; } +bool StorageEngine::get_peers_replicas_info(int64_t tablet_id, std::vector* replicas, + std::string* token) { + TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet == nullptr) { + LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; + return false; + } + std::unique_lock lock(_peer_replica_infos_mutex); + if (_tablet_replica_infos.contains(tablet_id)) { + std::vector reps = _tablet_replica_infos[tablet_id]; + DCHECK_NE(reps.size(), 0); + for (const auto& rep : reps) { + if (rep.replica_id != tablet->replica_id()) { + replicas->emplace_back(rep); + } + } + *token = _token; + return true; + } + return false; +} + bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) { #ifdef BE_TEST if (tablet_id % 2 == 0) { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index bf4dbd65b4ad45..61d648ac43e197 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -37,6 +37,7 @@ #include #include +#include "agent/task_worker_pool.h" #include "common/config.h" #include "common/status.h" #include "olap/calc_delete_bitmap_executor.h" @@ -312,6 +313,9 @@ class StorageEngine final : public BaseStorageEngine { bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token); + bool get_peers_replicas_info(int64_t tablet_id, std::vector* replicas, + std::string* token); + bool should_fetch_from_peer(int64_t tablet_id); const std::shared_ptr& get_stream_load_recorder() { @@ -341,6 +345,8 @@ class StorageEngine final : public BaseStorageEngine { std::set get_broken_paths() { return _broken_paths; } + PriorTaskWorkerPool* missing_rowset_thread_pool; + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -519,6 +525,7 @@ class StorageEngine final : public BaseStorageEngine { // key: tabletId std::unordered_map _peer_replica_infos; std::string _token; + std::map> _tablet_replica_infos; std::atomic _wakeup_producer_flag {0}; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index b840714633b8d9..f388d8a1003961 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -987,4 +988,54 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet, // TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica } +Status EngineCloneTask::clone_missing_rowset_from_peer(const TReplicaInfo& addr, + const std::string& token, + const std::vector& rowset_versions, + Tablet* tablet) { + LOG(INFO) << "begin to fetch compaction result, tablet_id=" << tablet->tablet_id() + << ", addr=" << addr.host << ", version=" << rowset_versions[0]; + std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); + if (!migration_rlock.owns_lock()) { + return Status::Error("got migration_rlock failed. tablet={}", + tablet->tablet_id()); + } + + std::string local_data_path = tablet->tablet_path() + CLONE_PREFIX; + std::string local_path = local_data_path + "/"; + std::string snapshot_path; + int timeout_s = 0; + bool is_incremental_clone = false; + // 1: make snapshot + RETURN_IF_ERROR(_make_snapshot(addr.host, addr.be_port, tablet->tablet_id(), + tablet->schema_hash(), timeout_s, rowset_versions, + &snapshot_path, &is_incremental_clone)); + Defer defer {[&, this] { + // TODO(plat1ko): Async release snapshot + auto st = _release_snapshot(addr.host, addr.be_port, snapshot_path); + if (!st.ok()) [[unlikely]] { + LOG_WARNING("failed to release snapshot in remote BE") + .tag("host", addr.host) + .tag("port", addr.be_port) + .tag("snapshot_path", snapshot_path) + .error(st); + } + }}; + // 2: download snapshot + std::string remote_url_prefix; + { + std::stringstream ss; + ss << "http://" << addr.host << ":" << addr.http_port << HTTP_REQUEST_PREFIX + << HTTP_REQUEST_TOKEN_PARAM << token << HTTP_REQUEST_FILE_PARAM << snapshot_path << "/" + << tablet->tablet_id() << "/" << tablet->schema_hash() << "/"; + remote_url_prefix = ss.str(); + } + RETURN_IF_ERROR(_download_files(tablet->data_dir(), remote_url_prefix, local_path)); + _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( + local_path, tablet->tablet_id(), tablet->replica_id(), tablet->table_id(), + tablet->partition_id(), tablet->schema_hash())); + // 4: finish_clone: create output_rowset and link file + return _finish_clone(tablet, local_data_path, rowset_versions.back().second, + is_incremental_clone); +} + } // namespace doris diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 9a5b6db445a8d1..6ccba85656bdd5 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -59,6 +59,9 @@ class EngineCloneTask final : public EngineTask { int64_t get_copy_size() const { return _copy_size; } int64_t get_copy_time_ms() const { return _copy_time_ms; } + Status clone_missing_rowset_from_peer(const TReplicaInfo& addr, const std::string& token, + const std::vector& rowset_versions, + Tablet* tablet); private: Status _do_clone(); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 2dcc1723b71005..87c1d49a585ec6 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -32,6 +32,7 @@ #include #include +#include "cloud/config.h" #include "common/logging.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -216,6 +217,24 @@ Status EnginePublishVersionTask::execute() { continue; } auto handle_version_not_continuous = [&]() { + if (config::enable_compaction_clone_missing_rowset) { + TAgentTaskRequest task; + TMissingRowsetReq req; + req.__set_tablet_id(0); + req.__set_missing_rowset_start_version(max_version + 1); + req.__set_missing_rowset_end_version(version.first - 1); + task.__set_task_type(TTaskType::CLONE); + task.__set_missing_rowset_req(req); + task.__set_priority(TPriority::HIGH); + PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance() + ->storage_engine() + .to_local() + .missing_rowset_thread_pool; + auto st = thread_pool->submit_high_prior_task(task); + if (!st.ok()) { + LOG_WARNING("mow clone missing rowset fail"); + } + } add_error_tablet_id(tablet_info.tablet_id); // When there are too many missing versions, do not directly retry the // publish and handle it through async publish. diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index bfc1c34e8f16d7..43fe1e4df473d8 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -22,6 +22,7 @@ #include +#include #include #include #include From 68f4da68cbc92cc83afc53005325ad0d6ce025df Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 7 Jul 2025 16:32:43 +0800 Subject: [PATCH 2/6] 2 --- be/src/agent/task_worker_pool.cpp | 97 +------------------ be/src/agent/task_worker_pool.h | 2 +- be/src/olap/cumulative_compaction.cpp | 23 +++-- be/src/olap/storage_engine.cpp | 18 +++- be/src/olap/storage_engine.h | 3 +- be/src/olap/task/engine_clone_task.cpp | 51 ---------- be/src/olap/task/engine_clone_task.h | 3 - .../olap/task/engine_publish_version_task.cpp | 22 +++-- .../doris/service/FrontendServiceImpl.java | 2 + gensrc/thrift/Types.thrift | 2 + 10 files changed, 58 insertions(+), 165 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 81d9a4161c794c..6630b1c0acd442 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -611,7 +611,7 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { }); } -Status PriorTaskWorkerPool::submit_high_prior_task(const TAgentTaskRequest& task) { +Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskRequest& task) { const TTaskType::type task_type = task.task_type; int64_t signature = task.signature; std::string type_str; @@ -624,6 +624,7 @@ Status PriorTaskWorkerPool::submit_high_prior_task(const TAgentTaskRequest& task auto& set = s_task_signatures[task_type]; if (!set.contains(signature)) { // 如果不存在,直接放到优先队列 + add_task_count(*req, 1); set.insert(signature); std::lock_guard lock(_mtx); _high_prior_queue.push_back(std::move(req)); @@ -633,10 +634,10 @@ Status PriorTaskWorkerPool::submit_high_prior_task(const TAgentTaskRequest& task } else { std::lock_guard lock(_mtx); for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { - // 如果存在普通队列,将普通队列中的task move到优先队列 + // 如果存在普通队列,将普通队列中的task cancel if ((*it)->signature == signature) { - _high_prior_queue.push_back(std::move(*it)); // 复制到目标 - it = _normal_queue.erase(it); // 从源中删除,erase返回下一个有效迭代器 + _normal_queue.erase(it); // cancel 原来的任务 + _high_prior_queue.push_back(std::move(req)); // 将新任务添加到队列 _high_prior_condv.notify_one(); _normal_condv.notify_one(); break; @@ -2099,96 +2100,8 @@ void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& re visible_version_req.partition_version); } -Status get_rowset_verisons_from_peer(const TReplicaInfo& addr, std::vector& peer_versions, - int64_t tablet_id) { - PGetTabletVersionsRequest request; - request.set_tablet_id(tablet_id); - PGetTabletVersionsResponse response; - std::shared_ptr stub = - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host, - addr.brpc_port); - if (stub == nullptr) { - return Status::Aborted("get rpc stub failed, host={}, port={}", addr.host, addr.brpc_port); - } - - brpc::Controller cntl; - stub->get_tablet_rowset_versions(&cntl, &request, &response, nullptr); - if (cntl.Failed()) { - return Status::Aborted("open brpc connection failed"); - } - if (response.status().status_code() != 0) { - return Status::Aborted("peer don't have tablet"); - } - if (response.versions_size() == 0) { - return Status::Aborted("no peer version"); - } - for (int i = 0; i < response.versions_size(); ++i) { - peer_versions.emplace_back(response.versions(i).first(), response.versions(i).second()); - } - return Status::OK(); -} - -bool find_be_to_fetch(const std::unordered_map>& peers_versions, - TReplicaInfo* addr) { - return true; -} - -void clone_missing_rowset(StorageEngine& engine, const ClusterInfo* cluster_info, - const TAgentTaskRequest& req) { - std::vector missing_versions; - int64_t tablet_id = req.missing_rowset_req.tablet_id; - Version missing_version(req.missing_rowset_req.missing_rowset_start_version, - req.missing_rowset_req.missing_rowset_end_version); - missing_versions.emplace_back(missing_version); - std::vector addrs; - std::string token; - // 1. 先拿到所有的replica信息,例如一共3副本,这里的addrs会拿到其他2个副本的信息 - if (!engine.get_peers_replicas_info(tablet_id, &addrs, &token)) { - LOG(WARNING) << tablet_id << " tablet don't have peer replica"; - return; - } - - // 2. 拿到上述所有副本的rowset版本分布信息 - std::unordered_map> peers_versions; - for (const auto& addr : addrs) { - std::vector peer_versions; - auto st = get_rowset_verisons_from_peer(addr, peer_versions, tablet_id); - if (!st.ok()) { - LOG_WARNING("failed to get rowset version from peer"); - return; - } - peers_versions.insert({addr.replica_id, peer_versions}); - } - TReplicaInfo addr; - // 3. 遍历每个副本,寻找能匹配的be或者没有缺版本的be - if (!find_be_to_fetch(peers_versions, &addr)) { - LOG_WARNING("failed to find be to fetch"); - } - - // 4. fetch compaction result - auto tablet = engine.tablet_manager()->get_tablet(tablet_id); - std::vector tablet_infos; - TCloneReq clone_req; - clone_req.__set_tablet_id(tablet_id); - EngineCloneTask task(engine, clone_req, cluster_info, req.signature, &tablet_infos); - auto st = task.clone_missing_rowset_from_peer(addr, token, missing_versions, tablet.get()); - if (!st.ok()) { - LOG_WARNING("failed to get rowset version from peer"); - return; - } -} - void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, const TAgentTaskRequest& req) { - DCHECK(req.__isset.clone_req || req.__isset.missing_rowset_req); - - if (req.__isset.missing_rowset_req) { - DorisMetrics::instance()->clone_requests_total->increment(1); - LOG(INFO) << "get missing rowset clone task. signature=" << req.signature; - clone_missing_rowset(engine, cluster_info, req); - return; - } - const auto& clone_req = req.clone_req; DorisMetrics::instance()->clone_requests_total->increment(1); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 8d992a06da2e7a..b64d84fb5c4a19 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -89,7 +89,7 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf { Status submit_task(const TAgentTaskRequest& task) override; - Status submit_high_prior_task(const TAgentTaskRequest& task); + Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task); private: void normal_loop(); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 021cdc47657788..c80161481f5163 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -18,6 +18,8 @@ #include "olap/cumulative_compaction.h" #include +#include +#include #include #include @@ -192,17 +194,26 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); if (config::enable_compaction_clone_missing_rowset) { + std::vector backends; + if (!_engine.get_peers_replica_backends(_tablet->tablet_id(), &backends)) { + LOG(WARNING) << _tablet->tablet_id() << " tablet don't have peer replica backends"; + return Status::InternalError(""); + } TAgentTaskRequest task; - TMissingRowsetReq req; - req.__set_tablet_id(0); - req.__set_missing_rowset_start_version(missing_versions[0].second + 1); - req.__set_missing_rowset_end_version(missing_versions[1].first - 1); + TCloneReq req; + req.__set_tablet_id(_tablet->tablet_id()); + req.__set_schema_hash(_tablet->schema_hash()); + req.__set_src_backends(backends); + req.__set_version(missing_versions.back().first); + req.__set_replica_id(tablet()->replica_id()); + req.__set_partition_id(_tablet->partition_id()); + req.__set_table_id(_tablet->table_id()); task.__set_task_type(TTaskType::CLONE); - task.__set_missing_rowset_req(req); + task.__set_clone_req(req); task.__set_priority(TPriority::HIGH); PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance()->storage_engine().to_local().missing_rowset_thread_pool; - RETURN_IF_ERROR(thread_pool->submit_high_prior_task(task)); + RETURN_IF_ERROR(thread_pool->submit_high_prior_and_cancel_low(task)); } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 7971625d7c7c98..260ff7844de186 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -20,6 +20,7 @@ // IWYU pragma: no_include #include #include +#include #include #include #include @@ -1501,8 +1502,7 @@ bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* repli return false; } -bool StorageEngine::get_peers_replicas_info(int64_t tablet_id, std::vector* replicas, - std::string* token) { +bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector* backends) { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); if (tablet == nullptr) { LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; @@ -1514,10 +1514,20 @@ bool StorageEngine::get_peers_replicas_info(int64_t tablet_id, std::vectorreplica_id()) { - replicas->emplace_back(rep); + TBackend backend; + backend.__set_host(rep.host); + backend.__set_be_port(rep.be_port); + backend.__set_http_port(rep.http_port); + backend.__set_brpc_port(rep.brpc_port); + if (rep.__isset.is_alive) { + backend.__set_is_alive(rep.is_alive); + } + if (rep.__isset.backend_id) { + backend.__set_id(rep.backend_id); + } + backends->emplace_back(backend); } } - *token = _token; return true; } return false; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 61d648ac43e197..e1c5168ca2af84 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -313,8 +313,7 @@ class StorageEngine final : public BaseStorageEngine { bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token); - bool get_peers_replicas_info(int64_t tablet_id, std::vector* replicas, - std::string* token); + bool get_peers_replica_backends(int64_t tablet_id, std::vector* backends); bool should_fetch_from_peer(int64_t tablet_id); diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index f388d8a1003961..db0ad79f51f05b 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -987,55 +987,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet, return tablet->revise_tablet_meta(to_add, to_delete, false); // TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica } - -Status EngineCloneTask::clone_missing_rowset_from_peer(const TReplicaInfo& addr, - const std::string& token, - const std::vector& rowset_versions, - Tablet* tablet) { - LOG(INFO) << "begin to fetch compaction result, tablet_id=" << tablet->tablet_id() - << ", addr=" << addr.host << ", version=" << rowset_versions[0]; - std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); - if (!migration_rlock.owns_lock()) { - return Status::Error("got migration_rlock failed. tablet={}", - tablet->tablet_id()); - } - - std::string local_data_path = tablet->tablet_path() + CLONE_PREFIX; - std::string local_path = local_data_path + "/"; - std::string snapshot_path; - int timeout_s = 0; - bool is_incremental_clone = false; - // 1: make snapshot - RETURN_IF_ERROR(_make_snapshot(addr.host, addr.be_port, tablet->tablet_id(), - tablet->schema_hash(), timeout_s, rowset_versions, - &snapshot_path, &is_incremental_clone)); - Defer defer {[&, this] { - // TODO(plat1ko): Async release snapshot - auto st = _release_snapshot(addr.host, addr.be_port, snapshot_path); - if (!st.ok()) [[unlikely]] { - LOG_WARNING("failed to release snapshot in remote BE") - .tag("host", addr.host) - .tag("port", addr.be_port) - .tag("snapshot_path", snapshot_path) - .error(st); - } - }}; - // 2: download snapshot - std::string remote_url_prefix; - { - std::stringstream ss; - ss << "http://" << addr.host << ":" << addr.http_port << HTTP_REQUEST_PREFIX - << HTTP_REQUEST_TOKEN_PARAM << token << HTTP_REQUEST_FILE_PARAM << snapshot_path << "/" - << tablet->tablet_id() << "/" << tablet->schema_hash() << "/"; - remote_url_prefix = ss.str(); - } - RETURN_IF_ERROR(_download_files(tablet->data_dir(), remote_url_prefix, local_path)); - _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( - local_path, tablet->tablet_id(), tablet->replica_id(), tablet->table_id(), - tablet->partition_id(), tablet->schema_hash())); - // 4: finish_clone: create output_rowset and link file - return _finish_clone(tablet, local_data_path, rowset_versions.back().second, - is_incremental_clone); -} - } // namespace doris diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 6ccba85656bdd5..9a5b6db445a8d1 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -59,9 +59,6 @@ class EngineCloneTask final : public EngineTask { int64_t get_copy_size() const { return _copy_size; } int64_t get_copy_time_ms() const { return _copy_time_ms; } - Status clone_missing_rowset_from_peer(const TReplicaInfo& addr, const std::string& token, - const std::vector& rowset_versions, - Tablet* tablet); private: Status _do_clone(); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 87c1d49a585ec6..57498484de7851 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -218,19 +218,29 @@ Status EnginePublishVersionTask::execute() { } auto handle_version_not_continuous = [&]() { if (config::enable_compaction_clone_missing_rowset) { + std::vector backends; + if (!_engine.get_peers_replica_backends(tablet->tablet_id(), + &backends)) { + LOG(WARNING) << tablet->tablet_id() + << " tablet don't have peer replica backends"; + } TAgentTaskRequest task; - TMissingRowsetReq req; - req.__set_tablet_id(0); - req.__set_missing_rowset_start_version(max_version + 1); - req.__set_missing_rowset_end_version(version.first - 1); + TCloneReq req; + req.__set_tablet_id(tablet->tablet_id()); + req.__set_schema_hash(tablet->schema_hash()); + req.__set_src_backends(backends); + req.__set_version(version.first - 1); + req.__set_replica_id(tablet->replica_id()); + req.__set_partition_id(tablet->partition_id()); + req.__set_table_id(tablet->table_id()); task.__set_task_type(TTaskType::CLONE); - task.__set_missing_rowset_req(req); + task.__set_clone_req(req); task.__set_priority(TPriority::HIGH); PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance() ->storage_engine() .to_local() .missing_rowset_thread_pool; - auto st = thread_pool->submit_high_prior_task(task); + auto st = thread_pool->submit_high_prior_and_cancel_low(task); if (!st.ok()) { LOG_WARNING("mow clone missing rowset fail"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9610afe22dd5e5..adfa8237cb3020 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2741,6 +2741,8 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos replicaInfo.setBePort(backend.getBePort()); replicaInfo.setHttpPort(backend.getHttpPort()); replicaInfo.setBrpcPort(backend.getBrpcPort()); + replicaInfo.setIsAlive(backend.isAlive()); + replicaInfo.setBackendId(backend.getId()); replicaInfo.setReplicaId(replica.getId()); replicaInfos.add(replicaInfo); } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 4fe2e1ab0825b1..54436bbc130969 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -670,6 +670,8 @@ struct TReplicaInfo { 3: required TPort http_port 4: required TPort brpc_port 5: required TReplicaId replica_id + 6: optional bool is_alive + 7: optional i64 backend_id } struct TResourceInfo { From d5ac05fbddb4ad7af5e0c01e7ed6344ab03ac3a8 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 8 Jul 2025 21:05:50 +0800 Subject: [PATCH 3/6] 3 --- be/src/agent/agent_server.h | 2 - be/src/agent/task_worker_pool.cpp | 2 +- be/src/cloud/config.cpp | 4 +- be/src/olap/cumulative_compaction.cpp | 7 + be/src/olap/olap_server.cpp | 1 - be/src/olap/storage_engine.cpp | 49 ++++++- be/src/olap/storage_engine.h | 3 +- .../olap/task/engine_publish_version_task.cpp | 9 +- ...lone_missing_rowset_fault_injection.groovy | 112 +++++++++++++++ ...st_mow_publish_clone_missing_rowset.groovy | 134 ++++++++++++++++++ 10 files changed, 313 insertions(+), 10 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy create mode 100644 regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 01333f2881cf85..e5b5b522ba0223 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -24,8 +24,6 @@ #include #include -#include "util/work_thread_pool.hpp" - namespace doris { class TaskWorkerPoolIf; diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 6630b1c0acd442..2af747a0264af0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -646,7 +646,7 @@ Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskReq } } // 如果存在高优队列,不需要任何操作 - LOG(INFO) << "exit already."; + LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); } } while (true); diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 859b49d05c9c00..7dc435f7a155dc 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -91,9 +91,9 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); DEFINE_Bool(enable_check_storage_vault, "true"); -DEFINE_mBool(enable_compaction_clone_missing_rowset, "true"); +DEFINE_mBool(enable_compaction_clone_missing_rowset, "false"); -DEFINE_mBool(enable_mow_publish_clone_missing_rowset, "true"); +DEFINE_mBool(enable_mow_publish_clone_missing_rowset, "false"); #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index c80161481f5163..e0e1de34d1706d 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -211,8 +211,15 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { task.__set_task_type(TTaskType::CLONE); task.__set_clone_req(req); task.__set_priority(TPriority::HIGH); + task.__set_signature(_tablet->tablet_id()); PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance()->storage_engine().to_local().missing_rowset_thread_pool; + LOG_INFO("cumulative compaction submit missing rowset clone task.") + .tag("tablet_id", _tablet->tablet_id()) + .tag("version", missing_versions.back().first) + .tag("replica_id", tablet()->replica_id()) + .tag("partition_id", _tablet->partition_id()) + .tag("table_id", _tablet->table_id()); RETURN_IF_ERROR(thread_pool->submit_high_prior_and_cancel_low(task)); } } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 28467513749c6e..a709ad1467cda6 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -779,7 +779,6 @@ void StorageEngine::_update_replica_infos_callback() { } std::unique_lock lock(_peer_replica_infos_mutex); - _tablet_replica_infos = result.tablet_replica_infos; for (const auto& it : result.tablet_replica_infos) { auto tablet_id = it.first; auto tablet = _tablet_manager->get_tablet(tablet_id); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 260ff7844de186..099f4bb802724a 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -51,6 +51,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "gen_cpp/FrontendService.h" #include "io/fs/local_file_system.h" #include "olap/binlog.h" #include "olap/data_dir.h" @@ -69,6 +70,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/txn_manager.h" +#include "runtime/client_cache.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" #include "util/mem_info.h" @@ -76,6 +78,7 @@ #include "util/stopwatch.hpp" #include "util/thread.h" #include "util/threadpool.h" +#include "util/thrift_rpc_helper.h" #include "util/uid_util.h" #include "util/work_thread_pool.hpp" @@ -1508,9 +1511,42 @@ bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vectorcluster_info(); + if (cluster_info == nullptr) { + LOG(WARNING) << "Have not get FE Master heartbeat yet"; + return false; + } + TNetworkAddress master_addr = cluster_info->master_fe_addr; + if (master_addr.hostname.empty() || master_addr.port == 0) { + LOG(WARNING) << "Have not get FE Master heartbeat yet"; + return false; + } + TGetTabletReplicaInfosRequest request; + TGetTabletReplicaInfosResult result; + request.tablet_ids.emplace_back(tablet_id); + Status rpc_st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->getTabletReplicaInfos(result, request); + }); + + if (!rpc_st.ok()) { + LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " + "tablet id: " + << tablet_id; + return false; + } std::unique_lock lock(_peer_replica_infos_mutex); - if (_tablet_replica_infos.contains(tablet_id)) { - std::vector reps = _tablet_replica_infos[tablet_id]; + if (result.tablet_replica_infos.contains(tablet_id)) { + std::vector reps = result.tablet_replica_infos[tablet_id]; DCHECK_NE(reps.size(), 0); for (const auto& rep : reps) { if (rep.replica_id != tablet->replica_id()) { @@ -1526,8 +1562,17 @@ bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vectoremplace_back(backend); + std::stringstream backend_string; + backend.printTo(backend_string); + LOG_INFO("get 1 peer replica backend info.") + .tag("tablet id", tablet_id) + .tag("backend info", backend_string.str()); } } + _last_get_peers_replica_backends_time_ms = UnixMillis(); + LOG_INFO("succeed get peers replica backends info.") + .tag("tablet id", tablet_id) + .tag("replica num", backends->size()); return true; } return false; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index e1c5168ca2af84..bfbe0e93d4a401 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -524,7 +524,6 @@ class StorageEngine final : public BaseStorageEngine { // key: tabletId std::unordered_map _peer_replica_infos; std::string _token; - std::map> _tablet_replica_infos; std::atomic _wakeup_producer_flag {0}; @@ -567,6 +566,8 @@ class StorageEngine final : public BaseStorageEngine { // thread to check tablet delete bitmap count tasks scoped_refptr _check_delete_bitmap_score_thread; + + int64_t _last_get_peers_replica_backends_time_ms {0}; }; // lru cache for create tabelt round robin in disks diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 57498484de7851..818d9192141b42 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -217,7 +217,7 @@ Status EnginePublishVersionTask::execute() { continue; } auto handle_version_not_continuous = [&]() { - if (config::enable_compaction_clone_missing_rowset) { + if (config::enable_mow_publish_clone_missing_rowset) { std::vector backends; if (!_engine.get_peers_replica_backends(tablet->tablet_id(), &backends)) { @@ -236,10 +236,17 @@ Status EnginePublishVersionTask::execute() { task.__set_task_type(TTaskType::CLONE); task.__set_clone_req(req); task.__set_priority(TPriority::HIGH); + task.__set_signature(tablet->tablet_id()); PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance() ->storage_engine() .to_local() .missing_rowset_thread_pool; + LOG_INFO("cumulative compaction submit missing rowset clone task.") + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version.first - 1) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); auto st = thread_pool->submit_high_prior_and_cancel_low(task); if (!st.ok()) { LOG_WARNING("mow clone missing rowset fail"); diff --git a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy new file mode 100644 index 00000000000000..8b23a8399ee1b6 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_compaction_clone_missing_rowset=true" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + try { + def tableName = "test_compaction_clone_missing_rowset" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // 1st check rowsets + logger.info("1st show:" + tabletId) + def (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + // missing rowset [3-5] + assertTrue(out.contains("[3-5]")) + assertTrue(out.contains("[6-6]")) + + logger.info("1st run cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + sleep(10000) + + // 2nd check rowsets + logger.info("2nd show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + logger.info("2nd cumu compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + // check rowsets + logger.info("3rd show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("3rd show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-6]")) + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} diff --git a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy new file mode 100644 index 00000000000000..000b411973e673 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') { + + def set_be_param = { paramName, paramValue, beIp, bePort -> + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + options.feConfigs += [ "disable_tablet_scheduler=true" ] + options.beConfigs += [ "enable_mow_publish_clone_missing_rowset=false" ] + options.beNum = 3 + docker(options) { + + def injectBe = null + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + try { + def tableName = "test_mow_publish_clone_missing_rowset" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "3", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"]) + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + sql """ INSERT INTO ${tableName} VALUES (5,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def tabletId = array[0].TabletId + + // normal be check rowsets + logger.info("normal be show:" + tabletId) + def (code, out, err) = be_show_tablet_status(normalBe.Host, normalBe.HttpPort, tabletId) + logger.info("normal be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + // 1st inject be check rowsets + logger.info("1st inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("1st inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertFalse(out.contains("[3-3]")) + assertFalse(out.contains("[4-4]")) + assertFalse(out.contains("[5-5]")) + assertFalse(out.contains("[6-6]")) + + set_be_param("enable_mow_publish_clone_missing_rowset", "true", injectBe.Host, injectBe.HttpPort); + Thread.sleep(10000) + // submit clone task + sql """ INSERT INTO ${tableName} VALUES (6,0)""" + + sleep(10000) + + // 2nd inject be check rowsets + logger.info("2nd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("2nd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + assertTrue(out.contains("[7-7]")) + + sql """ INSERT INTO ${tableName} VALUES (7,0)""" + + // 3rd inject be check rowsets + logger.info("3rd inject be show:" + tabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("3rd inject be show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random") + } + } + } +} \ No newline at end of file From 2a1907e9dc36d89cb60d9d362eca015857b345dc Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 9 Jul 2025 21:43:41 +0800 Subject: [PATCH 4/6] 4 --- be/src/agent/agent_server.cpp | 3 +- be/src/agent/task_worker_pool.cpp | 12 +++--- be/src/cloud/config.cpp | 4 -- be/src/cloud/config.h | 4 -- be/src/common/config.cpp | 4 ++ be/src/common/config.h | 4 ++ be/src/olap/cumulative_compaction.cpp | 34 +++++----------- be/src/olap/storage_engine.cpp | 32 +++++++++++++++ be/src/olap/storage_engine.h | 4 +- .../olap/task/engine_publish_version_task.cpp | 40 ++++++------------- ...lone_missing_rowset_fault_injection.groovy | 2 +- ...st_mow_publish_clone_missing_rowset.groovy | 4 +- 12 files changed, 76 insertions(+), 71 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 38e4d6a59cbc67..998456a500d071 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -200,8 +200,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); })); // clang-format on - exec_env->storage_engine().to_local().missing_rowset_thread_pool = - static_cast(_workers[TTaskType::CLONE].get()); + exec_env->storage_engine().to_local().workers = &_workers; } void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) { diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 2af747a0264af0..026d6b91dc2bc7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -623,7 +623,7 @@ Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskReq std::lock_guard lock(s_task_signatures_mtx); auto& set = s_task_signatures[task_type]; if (!set.contains(signature)) { - // 如果不存在,直接放到优先队列 + // If it doesn't exist, put it directly into the priority queue add_task_count(*req, 1); set.insert(signature); std::lock_guard lock(_mtx); @@ -634,18 +634,18 @@ Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskReq } else { std::lock_guard lock(_mtx); for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { - // 如果存在普通队列,将普通队列中的task cancel + // If it exists in the normal queue, cancel the task in the normal queue if ((*it)->signature == signature) { - _normal_queue.erase(it); // cancel 原来的任务 - _high_prior_queue.push_back(std::move(req)); // 将新任务添加到队列 + _normal_queue.erase(it); // cancel the original task + _high_prior_queue.push_back(std::move(req)); // add the new task to the queue _high_prior_condv.notify_one(); _normal_condv.notify_one(); break; } else { - ++it; // 不满足条件,继续下一个 + ++it; // doesn't meet the condition, continue to the next one } } - // 如果存在高优队列,不需要任何操作 + // If it exists in the high priority queue, no operation is needed LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); } } while (true); diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 7dc435f7a155dc..0f78e885881bbd 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -91,9 +91,5 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); DEFINE_Bool(enable_check_storage_vault, "true"); -DEFINE_mBool(enable_compaction_clone_missing_rowset, "false"); - -DEFINE_mBool(enable_mow_publish_clone_missing_rowset, "false"); - #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 748d81aa5feb55..4ac3bb31c61a81 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -127,9 +127,5 @@ DECLARE_mInt32(meta_service_conflict_error_retry_times); DECLARE_Bool(enable_check_storage_vault); -DECLARE_mBool(enable_compaction_clone_missing_rowset); - -DECLARE_mBool(enable_mow_publish_clone_missing_rowset); - #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d63570ff4dffa9..15b7dc92d83227 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1532,6 +1532,10 @@ DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); // ATTENTION: for test only, use random segments key bounds truncation threshold every time DEFINE_mBool(random_segments_key_bounds_truncation, "false"); +DEFINE_mBool(enable_auto_clone_on_compaction_missing_version, "false"); + +DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index a867bfccfada96..71e5ae1f90ea8d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1603,6 +1603,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold); // ATTENTION: for test only, use random segments key bounds truncation threshold every time DECLARE_mBool(random_segments_key_bounds_truncation); +DECLARE_mBool(enable_auto_clone_on_compaction_missing_version); + +DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index e0e1de34d1706d..8cd41be388dbe6 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -26,7 +26,6 @@ #include #include -#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "olap/cumulative_compaction_policy.h" @@ -193,34 +192,23 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", first missed version prev rowset verison=" << missing_versions[0] << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); - if (config::enable_compaction_clone_missing_rowset) { - std::vector backends; - if (!_engine.get_peers_replica_backends(_tablet->tablet_id(), &backends)) { - LOG(WARNING) << _tablet->tablet_id() << " tablet don't have peer replica backends"; - return Status::InternalError(""); - } - TAgentTaskRequest task; - TCloneReq req; - req.__set_tablet_id(_tablet->tablet_id()); - req.__set_schema_hash(_tablet->schema_hash()); - req.__set_src_backends(backends); - req.__set_version(missing_versions.back().first); - req.__set_replica_id(tablet()->replica_id()); - req.__set_partition_id(_tablet->partition_id()); - req.__set_table_id(_tablet->table_id()); - task.__set_task_type(TTaskType::CLONE); - task.__set_clone_req(req); - task.__set_priority(TPriority::HIGH); - task.__set_signature(_tablet->tablet_id()); - PriorTaskWorkerPool* thread_pool = - ExecEnv::GetInstance()->storage_engine().to_local().missing_rowset_thread_pool; + if (config::enable_auto_clone_on_compaction_missing_version) { LOG_INFO("cumulative compaction submit missing rowset clone task.") .tag("tablet_id", _tablet->tablet_id()) .tag("version", missing_versions.back().first) .tag("replica_id", tablet()->replica_id()) .tag("partition_id", _tablet->partition_id()) .tag("table_id", _tablet->table_id()); - RETURN_IF_ERROR(thread_pool->submit_high_prior_and_cancel_low(task)); + Status st = _engine.submit_clone_task(tablet(), missing_versions.back().first); + if (!st) { + LOG_WARNING("cumulative compaction failed to submit missing rowset clone task.") + .tag("st", st.to_string()) + .tag("tablet_id", _tablet->tablet_id()) + .tag("version", missing_versions.back().first) + .tag("replica_id", tablet()->replica_id()) + .tag("partition_id", _tablet->partition_id()) + .tag("table_id", _tablet->table_id()); + } } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 099f4bb802724a..8fd6c4efc5c24b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -81,6 +82,7 @@ #include "util/thrift_rpc_helper.h" #include "util/uid_util.h" #include "util/work_thread_pool.hpp" +#include "vec/common/assert_cast.h" using std::filesystem::directory_iterator; using std::filesystem::path; @@ -1695,6 +1697,36 @@ Status StorageEngine::_persist_broken_paths() { return Status::OK(); } +Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) { + std::vector backends; + if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) { + LOG(WARNING) << tablet->tablet_id() << " tablet doesn't have peer replica backends"; + return Status::InternalError(""); + } + TAgentTaskRequest task; + TCloneReq req; + req.__set_tablet_id(tablet->tablet_id()); + req.__set_schema_hash(tablet->schema_hash()); + req.__set_src_backends(backends); + req.__set_version(version); + req.__set_replica_id(tablet->replica_id()); + req.__set_partition_id(tablet->partition_id()); + req.__set_table_id(tablet->table_id()); + task.__set_task_type(TTaskType::CLONE); + task.__set_clone_req(req); + task.__set_priority(TPriority::HIGH); + task.__set_signature(tablet->tablet_id()); + LOG_INFO("BE start to submit missing rowset clone task.") + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); + RETURN_IF_ERROR(assert_cast(workers->at(TTaskType::CLONE).get()) + ->submit_high_prior_and_cancel_low(task)); + return Status::OK(); +} + int CreateTabletRRIdxCache::get_index(const std::string& key) { auto* lru_handle = lookup(key); if (lru_handle) { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index bfbe0e93d4a401..f2350eedc4f802 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -344,7 +344,9 @@ class StorageEngine final : public BaseStorageEngine { std::set get_broken_paths() { return _broken_paths; } - PriorTaskWorkerPool* missing_rowset_thread_pool; + Status submit_clone_task(Tablet* tablet, int64_t version); + + std::unordered_map>* workers; private: // Instance should be inited from `static open()` diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 818d9192141b42..6c37e55da757e4 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -217,39 +217,23 @@ Status EnginePublishVersionTask::execute() { continue; } auto handle_version_not_continuous = [&]() { - if (config::enable_mow_publish_clone_missing_rowset) { - std::vector backends; - if (!_engine.get_peers_replica_backends(tablet->tablet_id(), - &backends)) { - LOG(WARNING) << tablet->tablet_id() - << " tablet don't have peer replica backends"; - } - TAgentTaskRequest task; - TCloneReq req; - req.__set_tablet_id(tablet->tablet_id()); - req.__set_schema_hash(tablet->schema_hash()); - req.__set_src_backends(backends); - req.__set_version(version.first - 1); - req.__set_replica_id(tablet->replica_id()); - req.__set_partition_id(tablet->partition_id()); - req.__set_table_id(tablet->table_id()); - task.__set_task_type(TTaskType::CLONE); - task.__set_clone_req(req); - task.__set_priority(TPriority::HIGH); - task.__set_signature(tablet->tablet_id()); - PriorTaskWorkerPool* thread_pool = ExecEnv::GetInstance() - ->storage_engine() - .to_local() - .missing_rowset_thread_pool; - LOG_INFO("cumulative compaction submit missing rowset clone task.") + if (config::enable_auto_clone_on_mow_publish_missing_version) { + LOG_INFO("mow publish submit missing rowset clone task.") .tag("tablet_id", tablet->tablet_id()) .tag("version", version.first - 1) .tag("replica_id", tablet->replica_id()) .tag("partition_id", tablet->partition_id()) .tag("table_id", tablet->table_id()); - auto st = thread_pool->submit_high_prior_and_cancel_low(task); - if (!st.ok()) { - LOG_WARNING("mow clone missing rowset fail"); + Status st = _engine.submit_clone_task(tablet.get(), version.first - 1); + if (!st) { + LOG_WARNING( + "mow publish failed to submit missing rowset clone task.") + .tag("st", st.to_string()) + .tag("tablet_id", tablet->tablet_id()) + .tag("version", version.first - 1) + .tag("replica_id", tablet->replica_id()) + .tag("partition_id", tablet->partition_id()) + .tag("table_id", tablet->table_id()); } } add_error_tablet_id(tablet_info.tablet_id); diff --git a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy index 8b23a8399ee1b6..a7f060a110888b 100644 --- a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy @@ -25,7 +25,7 @@ suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') { options.cloudMode = false options.enableDebugPoints() options.feConfigs += [ "disable_tablet_scheduler=true" ] - options.beConfigs += [ "enable_compaction_clone_missing_rowset=true" ] + options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ] options.beNum = 3 docker(options) { diff --git a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy index 000b411973e673..14f0073f5c88ec 100644 --- a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy +++ b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy @@ -31,7 +31,7 @@ suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') { options.cloudMode = false options.enableDebugPoints() options.feConfigs += [ "disable_tablet_scheduler=true" ] - options.beConfigs += [ "enable_mow_publish_clone_missing_rowset=false" ] + options.beConfigs += [ "enable_auto_clone_on_mow_publish_missing_version=false" ] options.beNum = 3 docker(options) { @@ -92,7 +92,7 @@ suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') { assertFalse(out.contains("[5-5]")) assertFalse(out.contains("[6-6]")) - set_be_param("enable_mow_publish_clone_missing_rowset", "true", injectBe.Host, injectBe.HttpPort); + set_be_param("enable_auto_clone_on_mow_publish_missing_version", "true", injectBe.Host, injectBe.HttpPort); Thread.sleep(10000) // submit clone task sql """ INSERT INTO ${tableName} VALUES (6,0)""" From 67fa50bae4d5383f7fba30009db52e8a540f9090 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 9 Jul 2025 22:12:44 +0800 Subject: [PATCH 5/6] 5 --- be/src/olap/storage_engine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 8fd6c4efc5c24b..5388e457352a6d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1723,7 +1723,7 @@ Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) { .tag("partition_id", tablet->partition_id()) .tag("table_id", tablet->table_id()); RETURN_IF_ERROR(assert_cast(workers->at(TTaskType::CLONE).get()) - ->submit_high_prior_and_cancel_low(task)); + ->submit_high_prior_and_cancel_low(task)); return Status::OK(); } From 3ef6d70a5e273dc988beac57e9750ded204e443c Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 10 Jul 2025 01:05:24 +0800 Subject: [PATCH 6/6] 6 --- be/src/agent/task_worker_pool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 026d6b91dc2bc7..cc59978da937e7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -648,7 +648,7 @@ Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskReq // If it exists in the high priority queue, no operation is needed LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); } - } while (true); + } while (false); // Set the receiving time of task so that we can determine whether it is timed out later (const_cast(task)).__set_recv_time(time(nullptr));