From d5fa7ee5b95fc704f3b0fef22df20f065f51ec7b Mon Sep 17 00:00:00 2001 From: yujun Date: Sun, 4 Aug 2024 10:52:32 +0800 Subject: [PATCH 1/3] fix clone new replica cause stale report --- be/src/agent/task_worker_pool.cpp | 67 ++++++++++++++++++- be/src/olap/task/engine_clone_task.cpp | 11 +-- be/src/olap/task/engine_clone_task.h | 6 +- .../org/apache/doris/master/MasterImpl.java | 5 ++ 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index e5bdf4dfb635cc..3866ba17876d59 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -98,6 +98,10 @@ std::unordered_map> s_task_signatur std::atomic_ulong s_report_version(time(nullptr) * 10000); +void increase_report_version() { + s_report_version.fetch_add(1, std::memory_order_relaxed); +} + // FIXME(plat1ko): Paired register and remove task info bool register_task_info(const TTaskType::type task_type, int64_t signature) { if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || @@ -197,7 +201,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req } if (status.ok()) { - s_report_version.fetch_add(1, std::memory_order_relaxed); + increase_report_version(); } // Return result to fe @@ -1381,7 +1385,7 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) .tag("tablet_id", create_tablet_req.tablet_id) .error(status); } else { - s_report_version.fetch_add(1, std::memory_order_relaxed); + increase_report_version(); // get path hash of the created tablet TabletSharedPtr tablet; { @@ -1476,7 +1480,7 @@ void push_callback(const TAgentTaskRequest& req) { .tag("signature", req.signature) .tag("tablet_id", push_req.tablet_id) .tag("push_type", push_req.push_type); - ++s_report_version; + increase_report_version(); finish_task_request.__set_finish_tablet_infos(tablet_infos); } else { LOG_WARNING("failed to execute push task") @@ -1492,6 +1496,59 @@ void push_callback(const TAgentTaskRequest& req) { remove_task_info(req.task_type, req.signature); } +<<<<<<< HEAD +======= +void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { + const auto& push_req = req.push_req; + + LOG(INFO) << "get push task. signature=" << req.signature + << " push_type=" << push_req.push_type; + + // Return result to fe + TFinishTaskRequest finish_task_request; + finish_task_request.__set_backend(BackendOptions::get_local_backend()); + finish_task_request.__set_task_type(req.task_type); + finish_task_request.__set_signature(req.signature); + + // Only support DELETE in cloud mode now + if (push_req.push_type != TPushType::DELETE) { + finish_task_request.__set_task_status( + Status::NotSupported("push_type {} not is supported", + std::to_string(push_req.push_type)) + .to_thrift()); + return; + } + + finish_task_request.__set_request_version(push_req.version); + + DorisMetrics::instance()->delete_requests_total->increment(1); + auto st = CloudDeleteTask::execute(engine, req.push_req); + if (st.ok()) { + LOG_INFO("successfully execute push task") + .tag("signature", req.signature) + .tag("tablet_id", push_req.tablet_id) + .tag("push_type", push_req.push_type); + increase_report_version(); + auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back(); + // Just need tablet_id + tablet_info.tablet_id = push_req.tablet_id; + finish_task_request.__isset.finish_tablet_infos = true; + } else { + DorisMetrics::instance()->delete_requests_failed->increment(1); + LOG_WARNING("failed to execute push task") + .tag("signature", req.signature) + .tag("tablet_id", push_req.tablet_id) + .tag("push_type", push_req.push_type) + .error(st); + } + + finish_task_request.__set_task_status(st.to_thrift()); + finish_task_request.__set_report_version(s_report_version); + + finish_task(finish_task_request); + remove_task_info(req.task_type, req.signature); +} + PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine) : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count, [this](const TAgentTaskRequest& task) { publish_version_callback(task); }), @@ -1743,6 +1800,10 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, LOG_INFO("successfully clone tablet") .tag("signature", req.signature) .tag("tablet_id", clone_req.tablet_id); + if (engine_task.is_new_tablet()) { + increase_report_version(); + finish_task_request.__set_report_version(s_report_version); + } finish_task_request.__set_finish_tablet_infos(tablet_infos); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 62af1fec61a2b1..300b65527c1006 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -190,7 +190,7 @@ Status EngineCloneTask::_do_clone() { tablet->tablet_id(), tablet->replica_id(), false)); tablet.reset(); } - bool is_new_tablet = tablet == nullptr; + _is_new_tablet = tablet == nullptr; // try to incremental clone std::vector missed_versions; // try to repair a tablet with missing version @@ -229,7 +229,7 @@ Status EngineCloneTask::_do_clone() { if (missed_versions.empty()) { LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id=" << _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id; - RETURN_IF_ERROR(_set_tablet_info(is_new_tablet)); + RETURN_IF_ERROR(_set_tablet_info()); return Status::OK(); } @@ -308,10 +308,11 @@ Status EngineCloneTask::_do_clone() { TabletMeta::construct_header_file_path(tablet_dir, _clone_req.tablet_id); RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(header_path)); } - return _set_tablet_info(is_new_tablet); + + return _set_tablet_info(); } -Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) { +Status EngineCloneTask::_set_tablet_info() { // Get clone tablet info TTabletInfo tablet_info; tablet_info.__set_tablet_id(_clone_req.tablet_id); @@ -321,7 +322,7 @@ Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) { if (_clone_req.__isset.version && tablet_info.version < _clone_req.version) { // if it is a new tablet and clone failed, then remove the tablet // if it is incremental clone, then must not drop the tablet - if (is_new_tablet) { + if (_is_new_tablet) { // we need to check if this cloned table's version is what we expect. // if not, maybe this is a stale remaining table which is waiting for drop. // we drop it. diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 6924bfc2aa9ad7..3938ee58e80165 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -56,6 +56,8 @@ class EngineCloneTask : public EngineTask { vector* tablet_infos); ~EngineCloneTask() {} + bool is_new_tablet() const { return _is_new_tablet; } + private: Status _do_clone(); @@ -72,7 +74,7 @@ class EngineCloneTask : public EngineTask { const vector& missing_versions, bool* allow_incremental_clone); - Status _set_tablet_info(bool is_new_tablet); + Status _set_tablet_info(); // Download tablet files from Status _download_files(DataDir* data_dir, const std::string& remote_url_prefix, @@ -95,7 +97,7 @@ class EngineCloneTask : public EngineTask { int64_t _copy_size; int64_t _copy_time_ms; std::vector _pending_rs_guards; + bool _is_new_tablet = false; }; // EngineTask } // namespace doris -#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_CLONE_TASK_H diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 507378851a0b34..92479534241f4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -532,6 +532,11 @@ private void finishDropReplica(AgentTask task) { private void finishClone(AgentTask task, TFinishTaskRequest request) { CloneTask cloneTask = (CloneTask) task; if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) { + if (request.isSetReportVersion()) { + long reportVersion = request.getReportVersion(); + Env.getCurrentSystemInfo().updateBackendReportVersion( + task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); + } Env.getCurrentEnv().getTabletScheduler().finishCloneTask(cloneTask, request); } else { LOG.warn("invalid clone task, ignore it. {}", task); From 97a016fbcb9c665d939915be6ea3335f6c2af7f6 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 5 Aug 2024 09:49:13 +0800 Subject: [PATCH 2/3] fix clone new replica cause stale report --- be/src/agent/task_worker_pool.cpp | 53 ------------------------------- 1 file changed, 53 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 3866ba17876d59..e79f45f3d17fa8 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1496,59 +1496,6 @@ void push_callback(const TAgentTaskRequest& req) { remove_task_info(req.task_type, req.signature); } -<<<<<<< HEAD -======= -void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { - const auto& push_req = req.push_req; - - LOG(INFO) << "get push task. signature=" << req.signature - << " push_type=" << push_req.push_type; - - // Return result to fe - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(req.task_type); - finish_task_request.__set_signature(req.signature); - - // Only support DELETE in cloud mode now - if (push_req.push_type != TPushType::DELETE) { - finish_task_request.__set_task_status( - Status::NotSupported("push_type {} not is supported", - std::to_string(push_req.push_type)) - .to_thrift()); - return; - } - - finish_task_request.__set_request_version(push_req.version); - - DorisMetrics::instance()->delete_requests_total->increment(1); - auto st = CloudDeleteTask::execute(engine, req.push_req); - if (st.ok()) { - LOG_INFO("successfully execute push task") - .tag("signature", req.signature) - .tag("tablet_id", push_req.tablet_id) - .tag("push_type", push_req.push_type); - increase_report_version(); - auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back(); - // Just need tablet_id - tablet_info.tablet_id = push_req.tablet_id; - finish_task_request.__isset.finish_tablet_infos = true; - } else { - DorisMetrics::instance()->delete_requests_failed->increment(1); - LOG_WARNING("failed to execute push task") - .tag("signature", req.signature) - .tag("tablet_id", push_req.tablet_id) - .tag("push_type", push_req.push_type) - .error(st); - } - - finish_task_request.__set_task_status(st.to_thrift()); - finish_task_request.__set_report_version(s_report_version); - - finish_task(finish_task_request); - remove_task_info(req.task_type, req.signature); -} - PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine) : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count, [this](const TAgentTaskRequest& task) { publish_version_callback(task); }), From 668646a445308019d11f2002abd2b7102bedf65e Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 5 Aug 2024 15:05:25 +0800 Subject: [PATCH 3/3] update --- be/src/olap/task/engine_clone_task.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 3938ee58e80165..80b9fdf4213f4a 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -101,3 +101,4 @@ class EngineCloneTask : public EngineTask { }; // EngineTask } // namespace doris +#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_CLONE_TASK_H