From add17d4a26a0b6376bb6a60d1dcec7b68a3c0249 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Sun, 30 Mar 2025 11:44:51 +0000 Subject: [PATCH] [fix](load) Fix the issue of insert into select concurrent reporting causing no error url --- be/src/pipeline/pipeline_fragment_context.cpp | 27 ++++++++ be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/runtime/fragment_mgr.cpp | 27 ++------ be/src/runtime/query_context.cpp | 10 +++ be/src/runtime/query_context.h | 7 ++ be/src/runtime/runtime_state.cpp | 10 +-- .../test_insert_error_url.groovy | 67 +++++++++++++++++++ 7 files changed, 126 insertions(+), 24 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_insert_error_url.groovy diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9e1f96d737ad49..141eb997962efd 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -201,6 +201,11 @@ void PipelineFragmentContext::cancel(const Status reason) { if (reason.is() || reason.is()) { print_profile("cancel pipeline, reason: " + reason.to_string()); } + + if (auto error_url = get_load_error_url(); !error_url.empty()) { + _query_ctx->set_load_error_url(error_url); + } + _query_ctx->cancel(reason, _fragment_id); if (reason.is()) { _is_report_on_cancel = false; @@ -1790,6 +1795,23 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { } } +std::string PipelineFragmentContext::get_load_error_url() { + if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + for (auto& task_states : _task_runtime_states) { + for (auto& task_state : task_states) { + if (!task_state) { + continue; + } + if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) { + return to_load_error_http_path(str); + } + } + } + return ""; +} + Status PipelineFragmentContext::send_report(bool done) { Status exec_status = _query_ctx->exec_status(); // If plan is done successfully, but _is_report_success is false, @@ -1820,6 +1842,10 @@ Status PipelineFragmentContext::send_report(bool done) { } } + std::string load_eror_url = _query_ctx->get_load_error_url().empty() + ? get_load_error_url() + : _query_ctx->get_load_error_url(); + ReportStatusRequest req {exec_status, runtime_states, done || !exec_status.ok(), @@ -1829,6 +1855,7 @@ Status PipelineFragmentContext::send_report(bool done) { TUniqueId(), -1, _runtime_state.get(), + load_eror_url, [this](const Status& reason) { cancel(reason); }}; return _report_status_cb( diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 992f86bc76d235..400f000f74344a 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -126,6 +126,8 @@ class PipelineFragmentContext : public TaskExecutionContext { } } + std::string get_load_error_url(); + private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 876d221284aea9..20ec95255f73a3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -457,26 +457,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); - if (!req.runtime_state->get_error_log_file_path().empty()) { - std::string error_log_url = - to_load_error_http_path(req.runtime_state->get_error_log_file_path()); - LOG(INFO) << "error log file path: " << error_log_url - << ", query id: " << print_id(req.query_id) - << ", fragment instance id: " << print_id(req.fragment_instance_id); - params.__set_tracking_url(error_log_url); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->get_error_log_file_path().empty()) { - std::string error_log_url = to_load_error_http_path(rs->get_error_log_file_path()); - LOG(INFO) << "error log file path: " << error_log_url - << ", query id: " << print_id(req.query_id) - << ", fragment instance id: " << print_id(rs->fragment_instance_id()); - params.__set_tracking_url(error_log_url); - } - if (rs->wal_id() > 0) { - params.__set_txn_id(rs->wal_id()); - params.__set_label(rs->import_label()); - } + if (!req.load_error_url.empty()) { + params.__set_tracking_url(req.load_error_url); + } + for (auto* rs : req.runtime_states) { + if (rs->wal_id() > 0) { + params.__set_txn_id(rs->wal_id()); + params.__set_label(rs->import_label()); } } if (!req.runtime_state->export_output_files().empty()) { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index fb6eee2bfd6b85..6ae772e6737a38 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -348,6 +348,16 @@ void QueryContext::cancel(Status new_status, int fragment_id) { cancel_all_pipeline_context(new_status, fragment_id); } +void QueryContext::set_load_error_url(std::string error_url) { + std::lock_guard lock(_error_url_lock); + _load_error_url = error_url; +} + +std::string QueryContext::get_load_error_url() { + std::lock_guard lock(_error_url_lock); + return _load_error_url; +} + void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) { std::vector> ctx_to_cancel; { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 841288751762cc..d5b7d27ecbda1a 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -61,6 +61,7 @@ struct ReportStatusRequest { TUniqueId fragment_instance_id; int backend_num; RuntimeState* runtime_state; + std::string load_error_url; std::function cancel_fn; }; @@ -392,6 +393,9 @@ class QueryContext : public std::enable_shared_from_this { std::string debug_string(); + void set_load_error_url(std::string error_url); + std::string get_load_error_url(); + private: int _timeout_second; TUniqueId _query_id; @@ -481,6 +485,9 @@ class QueryContext : public std::enable_shared_from_this { std::unordered_map>> _collect_realtime_query_profile() const; + std::mutex _error_url_lock; + std::string _load_error_url; + public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile void add_fragment_profile( diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 88a38d6356d3aa..dd5a2e6355c3e7 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -414,6 +414,11 @@ Status RuntimeState::append_error_msg_to_file(std::function line, } std::string RuntimeState::get_error_log_file_path() { + DBUG_EXECUTE_IF("RuntimeState::get_error_log_file_path.block", { + if (!_error_log_file_path.empty()) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); std::lock_guard l(_s3_error_log_file_lock); if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) { // close error log file @@ -422,10 +427,7 @@ std::string RuntimeState::get_error_log_file_path() { _exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path); // upload error log file to s3 Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path); - if (st.ok()) { - // remove local error log file - std::filesystem::remove(error_log_absolute_path); - } else { + if (!st.ok()) { // upload failed and return local error log file path LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path=" << _error_log_file_path << ", error=" << st; diff --git a/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy new file mode 100644 index 00000000000000..aa96642ac802f3 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_insert_error_url", "nonConcurrent") { + def tableName = "test_insert_error_url_tbl" + sql """drop table if exists ${tableName}""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + L_ORDERKEY INTEGER NOT NULL, + L_PARTKEY INTEGER NOT NULL, + L_SUPPKEY INTEGER NOT NULL, + L_LINENUMBER INTEGER NOT NULL, + L_QUANTITY DECIMAL(15,2) NOT NULL, + L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL, + L_DISCOUNT DECIMAL(15,2) NOT NULL, + L_TAX DECIMAL(15,2) NOT NULL, + L_RETURNFLAG CHAR(1) NOT NULL, + L_LINESTATUS CHAR(1) NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT CHAR(25) NOT NULL, + L_SHIPMODE CHAR(10) NOT NULL, + L_COMMENT VARCHAR(44) NOT NULL + ) + UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER) + PARTITION BY RANGE(L_ORDERKEY) ( + PARTITION p2023 VALUES LESS THAN ("5000000") + ) + DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block") + expectExceptionLike({ + sql """ + insert into ${tableName} select * from S3( + "uri" = "http://${getS3BucketName()}.${getS3Endpoint()}/regression/tpch/sf1/lineitem.csv.split01.gz", + "s3.access_key"= "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}", + "s3.region" = "${getS3Region()}", + "format" = "csv", + "column_separator" = "|" + ); + """ + }, "error_log") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block") + } +} \ No newline at end of file