Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ void PipelineFragmentContext::cancel(const Status reason) {
if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
print_profile("cancel pipeline, reason: " + reason.to_string());
}

if (auto error_url = get_load_error_url(); !error_url.empty()) {
_query_ctx->set_load_error_url(error_url);
}

_query_ctx->cancel(reason, _fragment_id);
if (reason.is<ErrorCode::LIMIT_REACH>()) {
_is_report_on_cancel = false;
Expand Down Expand Up @@ -1790,6 +1795,23 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
}
}

std::string PipelineFragmentContext::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (!task_state) {
continue;
}
if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
}
}
return "";
}

Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = _query_ctx->exec_status();
// If plan is done successfully, but _is_report_success is false,
Expand Down Expand Up @@ -1820,6 +1842,10 @@ Status PipelineFragmentContext::send_report(bool done) {
}
}

std::string load_eror_url = _query_ctx->get_load_error_url().empty()
? get_load_error_url()
: _query_ctx->get_load_error_url();

ReportStatusRequest req {exec_status,
runtime_states,
done || !exec_status.ok(),
Expand All @@ -1829,6 +1855,7 @@ Status PipelineFragmentContext::send_report(bool done) {
TUniqueId(),
-1,
_runtime_state.get(),
load_eror_url,
[this](const Status& reason) { cancel(reason); }};

return _report_status_cb(
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class PipelineFragmentContext : public TaskExecutionContext {
}
}

std::string get_load_error_url();

private:
Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe);
Expand Down
27 changes: 7 additions & 20 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,26 +457,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));

if (!req.runtime_state->get_error_log_file_path().empty()) {
std::string error_log_url =
to_load_error_http_path(req.runtime_state->get_error_log_file_path());
LOG(INFO) << "error log file path: " << error_log_url
<< ", query id: " << print_id(req.query_id)
<< ", fragment instance id: " << print_id(req.fragment_instance_id);
params.__set_tracking_url(error_log_url);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->get_error_log_file_path().empty()) {
std::string error_log_url = to_load_error_http_path(rs->get_error_log_file_path());
LOG(INFO) << "error log file path: " << error_log_url
<< ", query id: " << print_id(req.query_id)
<< ", fragment instance id: " << print_id(rs->fragment_instance_id());
params.__set_tracking_url(error_log_url);
}
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
if (!req.load_error_url.empty()) {
params.__set_tracking_url(req.load_error_url);
}
for (auto* rs : req.runtime_states) {
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
if (!req.runtime_state->export_output_files().empty()) {
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,16 @@ void QueryContext::cancel(Status new_status, int fragment_id) {
cancel_all_pipeline_context(new_status, fragment_id);
}

void QueryContext::set_load_error_url(std::string error_url) {
std::lock_guard<std::mutex> lock(_error_url_lock);
_load_error_url = error_url;
}

std::string QueryContext::get_load_error_url() {
std::lock_guard<std::mutex> lock(_error_url_lock);
return _load_error_url;
}

void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
{
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct ReportStatusRequest {
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
std::string load_error_url;
std::function<void(const Status&)> cancel_fn;
};

Expand Down Expand Up @@ -392,6 +393,9 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

std::string debug_string();

void set_load_error_url(std::string error_url);
std::string get_load_error_url();

private:
int _timeout_second;
TUniqueId _query_id;
Expand Down Expand Up @@ -481,6 +485,9 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_collect_realtime_query_profile() const;

std::mutex _error_url_lock;
std::string _load_error_url;

public:
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
void add_fragment_profile(
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
}

std::string RuntimeState::get_error_log_file_path() {
DBUG_EXECUTE_IF("RuntimeState::get_error_log_file_path.block", {
if (!_error_log_file_path.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
std::lock_guard<std::mutex> l(_s3_error_log_file_lock);
if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
// close error log file
Expand All @@ -422,10 +427,7 @@ std::string RuntimeState::get_error_log_file_path() {
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
// upload error log file to s3
Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path);
if (st.ok()) {
// remove local error log file
std::filesystem::remove(error_log_absolute_path);
} else {
if (!st.ok()) {
// upload failed and return local error log file path
LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path="
<< _error_log_file_path << ", error=" << st;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_insert_error_url", "nonConcurrent") {
def tableName = "test_insert_error_url_tbl"
sql """drop table if exists ${tableName}"""
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL
)
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
PARTITION BY RANGE(L_ORDERKEY) (
PARTITION p2023 VALUES LESS THAN ("5000000")
)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""

try {
GetDebugPoint().enableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
expectExceptionLike({
sql """
insert into ${tableName} select * from S3(
"uri" = "http://${getS3BucketName()}.${getS3Endpoint()}/regression/tpch/sf1/lineitem.csv.split01.gz",
"s3.access_key"= "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}",
"s3.region" = "${getS3Region()}",
"format" = "csv",
"column_separator" = "|"
);
"""
}, "error_log")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
}
}
Loading