From b18a9f1d9fbed5ff79ee90d84489c8fe3a822129 Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Tue, 12 Aug 2025 18:18:40 +0800 Subject: [PATCH 1/6] [fix](load) Fix the issue of insert into select reporting causing no error url --- be/src/pipeline/pipeline_fragment_context.cpp | 1 + be/src/pipeline/pipeline_fragment_context.h | 2 ++ .../pipeline_x_fragment_context.cpp | 27 ++++++++++++++++++- .../pipeline_x/pipeline_x_fragment_context.h | 2 ++ be/src/runtime/fragment_mgr.cpp | 12 ++------- be/src/runtime/plan_fragment_executor.cpp | 1 + be/src/runtime/query_context.cpp | 10 +++++++ be/src/runtime/query_context.h | 7 +++++ 8 files changed, 51 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a6a2fb7ab767e7..78247c02665e8e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1000,6 +1000,7 @@ Status PipelineFragmentContext::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), + _query_ctx->get_load_error_url(), [this](Status st) { return update_status(st); }, [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { cancel(reason, msg); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index b6ed9d42fd05ff..f7a75123cf2ad5 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -141,6 +141,8 @@ class PipelineFragmentContext : public TaskExecutionContext { uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); } + std::string get_load_error_url(); + protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); Status _build_pipelines(ExecNode*, PipelinePtr); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e45033b71e3b04..2666c76c6cd4e4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -160,6 +160,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, this->debug_string()); } } + if (auto error_url = get_load_error_url(); !error_url.empty()) { + _query_ctx->set_load_error_url(error_url); + } if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; @@ -1565,6 +1568,23 @@ void PipelineXFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } +std::string PipelineXFragmentContext::get_load_error_url() { + if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + for (auto& task_states : _task_runtime_states) { + for (auto& task_state : task_states) { + if (!task_state) { + continue; + } + if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + } + } + return ""; +} + Status PipelineXFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { @@ -1595,11 +1615,16 @@ Status PipelineXFragmentContext::send_report(bool done) { } } } + + std::string load_eror_url = _query_ctx->get_load_error_url().empty() + ? get_load_error_url() + : _query_ctx->get_load_error_url(); + return _report_status_cb( {true, exec_status, runtime_states, _runtime_profile.get(), _runtime_state->load_channel_profile(), done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, - _runtime_state.get(), [this](Status st) { return update_status(st); }, + _runtime_state.get(), load_eror_url,[this](Status st) { return update_status(st); }, [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { cancel(reason, msg); }}, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index c3e3f5596ad382..2a4b7294b349a4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -118,6 +118,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { void close_a_pipeline(PipelineId pipeline_id) override; + std::string get_load_error_url(); + private: void _close_fragment_instance() override; Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 896ddece1f586a..38637e0dc746d4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -534,16 +534,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); - if (!req.runtime_state->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(req.runtime_state->get_error_log_file_path())); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(rs->get_error_log_file_path())); - } - } + if (!req.load_error_url.empty()) { + params.__set_tracking_url(req.load_error_url); } if (!req.runtime_state->export_output_files().empty()) { params.__isset.export_files = true; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 4b6a6a0539dd58..a5a391316af218 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -515,6 +515,7 @@ void PlanFragmentExecutor::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), + _query_ctx->get_load_error_url(), std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1, std::placeholders::_2)}; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 4e050f17b4e048..67424013672dd5 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -251,6 +251,16 @@ void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) { } } +void QueryContext::set_load_error_url(std::string error_url) { + std::lock_guard lock(_error_url_lock); + _load_error_url = error_url; +} + +std::string QueryContext::get_load_error_url() { + std::lock_guard lock(_error_url_lock); + return _load_error_url; +} + void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason, const std::string& msg) { set_execution_dependency_ready(); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 5a11905f21a026..d60d9b03c69ca1 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -58,6 +58,7 @@ struct ReportStatusRequest { TUniqueId fragment_instance_id; int backend_num; RuntimeState* runtime_state; + std::string load_error_url; std::function update_fn; std::function cancel_fn; }; @@ -330,6 +331,9 @@ class QueryContext { fragment_instance_ids.push_back(ins_id); } + void set_load_error_url(std::string error_url); + std::string get_load_error_url(); + private: std::mutex _ins_lock; TUniqueId _query_id; @@ -392,6 +396,9 @@ class QueryContext { std::mutex _brpc_stubs_mutex; std::unordered_map> _using_brpc_stubs; + std::mutex _error_url_lock; + std::string _load_error_url; + public: timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } QuerySource get_query_source() const { return this->_query_source; } From ae0e623f46be903dc0b645baba9e1b5d13dc0516 Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Tue, 12 Aug 2025 18:26:40 +0800 Subject: [PATCH 2/6] code format --- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 2666c76c6cd4e4..f145314bd04e99 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1617,14 +1617,14 @@ Status PipelineXFragmentContext::send_report(bool done) { } std::string load_eror_url = _query_ctx->get_load_error_url().empty() - ? get_load_error_url() - : _query_ctx->get_load_error_url(); + ? get_load_error_url() + : _query_ctx->get_load_error_url(); return _report_status_cb( {true, exec_status, runtime_states, _runtime_profile.get(), _runtime_state->load_channel_profile(), done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, - _runtime_state.get(), load_eror_url,[this](Status st) { return update_status(st); }, + _runtime_state.get(), load_eror_url, [this](Status st) { return update_status(st); }, [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { cancel(reason, msg); }}, From 4533500f8324964c8cf2f71ed60d79ee20ca8ca0 Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Thu, 14 Aug 2025 16:22:20 +0800 Subject: [PATCH 3/6] p0 test_etl_failed.groovy --- .../suites/load_p0/broker_load/test_etl_failed.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy index c418223db279db..0c2d1f105d05e0 100644 --- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -65,7 +65,7 @@ suite("test_etl_failed", "load_p0") { assertTrue(1 == 2, "etl should be failed") break; } - if (result[0][2].equals("CANCELLED") && result[0][13].contains("_load_error_log")) { + if (result[0][2].equals("CANCELLED") && result[0][13] != null && result[0][13].contains("_load_error_log")) { break; } Thread.sleep(1000) From af42edf43d7d6b83c4835d90d3bd776fe9f17ef9 Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Mon, 18 Aug 2025 16:21:08 +0800 Subject: [PATCH 4/6] p0 test_etl_failed.groovy --- .../suites/load_p0/broker_load/test_etl_failed.groovy | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy index 0c2d1f105d05e0..2ac7f64727ee66 100644 --- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -65,7 +65,12 @@ suite("test_etl_failed", "load_p0") { assertTrue(1 == 2, "etl should be failed") break; } - if (result[0][2].equals("CANCELLED") && result[0][13] != null && result[0][13].contains("_load_error_log")) { + if (result[0][2].equals("CANCELLED")) { + logger.info("ErrorMsg: ",{result[0][7]}); + if (result[0][7].contains("ETL_QUALITY_UNSATISFIED" ) && !result[0][13].contains("_load_error_log")) { + assertTrue(1 == 2, "etl should be failed") + break; + } break; } Thread.sleep(1000) From 0a2726129b0dad9155bf04f01a4b5d7a59fff0ac Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Mon, 18 Aug 2025 18:43:46 +0800 Subject: [PATCH 5/6] p0 test_etl_failed.groovy --- .../suites/load_p0/broker_load/test_etl_failed.groovy | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy index 2ac7f64727ee66..cd6523e550fe79 100644 --- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -66,8 +66,9 @@ suite("test_etl_failed", "load_p0") { break; } if (result[0][2].equals("CANCELLED")) { - logger.info("ErrorMsg: ",{result[0][7]}); - if (result[0][7].contains("ETL_QUALITY_UNSATISFIED" ) && !result[0][13].contains("_load_error_log")) { + def reason = result[0][7] + logger.info("load failed, ErrorMsg: $reason"); + if (result[0][7] != null && result[0][7].contains("ETL_QUALITY_UNSATISFIED" ) && !result[0][13].contains("_load_error_log")) { assertTrue(1 == 2, "etl should be failed") break; } From fbce61c550dba613af094a52d70d0615c593e470 Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Tue, 19 Aug 2025 18:41:20 +0800 Subject: [PATCH 6/6] fix for p0 --- be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++++++++++- be/src/runtime/plan_fragment_executor.cpp | 15 ++++++++++++++- be/src/runtime/plan_fragment_executor.h | 2 ++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 78247c02665e8e..a72a57a418135e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -966,6 +966,13 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { } } +std::string PipelineFragmentContext::get_load_error_url() { + if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + return ""; +} + Status PipelineFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { @@ -987,6 +994,10 @@ Status PipelineFragmentContext::send_report(bool done) { return Status::NeedSendAgain(""); } + std::string load_eror_url = _query_ctx->get_load_error_url().empty() + ? get_load_error_url() + : _query_ctx->get_load_error_url(); + return _report_status_cb( {false, exec_status, @@ -1000,7 +1011,7 @@ Status PipelineFragmentContext::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), - _query_ctx->get_load_error_url(), + load_eror_url, [this](Status st) { return update_status(st); }, [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { cancel(reason, msg); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a5a391316af218..77886396285b47 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -44,6 +44,7 @@ #include "io/fs/stream_load_pipe.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/query_statistics.h" @@ -482,6 +483,13 @@ void PlanFragmentExecutor::report_profile() { VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id(); } +std::string PlanFragmentExecutor::get_load_error_url() { + if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + return ""; +} + void PlanFragmentExecutor::send_report(bool done) { Status status = Status::OK(); { @@ -502,6 +510,11 @@ void PlanFragmentExecutor::send_report(bool done) { if (!_is_report_success && !_is_report_on_cancel) { return; } + + std::string load_eror_url = _query_ctx->get_load_error_url().empty() + ? get_load_error_url() + : _query_ctx->get_load_error_url(); + ReportStatusRequest report_req = { false, status, @@ -515,7 +528,7 @@ void PlanFragmentExecutor::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), - _query_ctx->get_load_error_url(), + load_eror_url, std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1, std::placeholders::_2)}; diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 89b2534b61b93c..b2725b55e45eeb 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -146,6 +146,8 @@ class PlanFragmentExecutor : public TaskExecutionContext { Status update_status(Status status); + std::string get_load_error_url(); + private: ExecEnv* _exec_env = nullptr; // not owned ExecNode* _plan = nullptr; // lives in _runtime_state->obj_pool()