Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,13 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
}
}

std::string PipelineFragmentContext::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
return "";
}

Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
Expand All @@ -987,6 +994,10 @@ Status PipelineFragmentContext::send_report(bool done) {
return Status::NeedSendAgain("");
}

std::string load_eror_url = _query_ctx->get_load_error_url().empty()
? get_load_error_url()
: _query_ctx->get_load_error_url();

return _report_status_cb(
{false,
exec_status,
Expand All @@ -1000,6 +1011,7 @@ Status PipelineFragmentContext::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
load_eror_url,
[this](Status st) { return update_status(st); },
[this](const PPlanFragmentCancelReason& reason, const std::string& msg) {
cancel(reason, msg);
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class PipelineFragmentContext : public TaskExecutionContext {

uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }

std::string get_load_error_url();

protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state);
Status _build_pipelines(ExecNode*, PipelinePtr);
Expand Down
27 changes: 26 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
this->debug_string());
}
}
if (auto error_url = get_load_error_url(); !error_url.empty()) {
_query_ctx->set_load_error_url(error_url);
}

if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
Expand Down Expand Up @@ -1565,6 +1568,23 @@ void PipelineXFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
}

std::string PipelineXFragmentContext::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (!task_state) {
continue;
}
if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
}
}
return "";
}

Status PipelineXFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
Expand Down Expand Up @@ -1595,11 +1615,16 @@ Status PipelineXFragmentContext::send_report(bool done) {
}
}
}

std::string load_eror_url = _query_ctx->get_load_error_url().empty()
? get_load_error_url()
: _query_ctx->get_load_error_url();

return _report_status_cb(
{true, exec_status, runtime_states, _runtime_profile.get(),
_runtime_state->load_channel_profile(), done || !exec_status.ok(),
_query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num,
_runtime_state.get(), [this](Status st) { return update_status(st); },
_runtime_state.get(), load_eror_url, [this](Status st) { return update_status(st); },
[this](const PPlanFragmentCancelReason& reason, const std::string& msg) {
cancel(reason, msg);
}},
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

void close_a_pipeline(PipelineId pipeline_id) override;

std::string get_load_error_url();

private:
void _close_fragment_instance() override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
Expand Down
12 changes: 2 additions & 10 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,16 +534,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));

if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
}
if (!req.load_error_url.empty()) {
params.__set_tracking_url(req.load_error_url);
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
Expand Down
14 changes: 14 additions & 0 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "io/fs/stream_load_pipe.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
Expand Down Expand Up @@ -482,6 +483,13 @@ void PlanFragmentExecutor::report_profile() {
VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id();
}

std::string PlanFragmentExecutor::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
return to_load_error_http_path(str);
}
return "";
}

void PlanFragmentExecutor::send_report(bool done) {
Status status = Status::OK();
{
Expand All @@ -502,6 +510,11 @@ void PlanFragmentExecutor::send_report(bool done) {
if (!_is_report_success && !_is_report_on_cancel) {
return;
}

std::string load_eror_url = _query_ctx->get_load_error_url().empty()
? get_load_error_url()
: _query_ctx->get_load_error_url();

ReportStatusRequest report_req = {
false,
status,
Expand All @@ -515,6 +528,7 @@ void PlanFragmentExecutor::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
load_eror_url,
std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1,
std::placeholders::_2)};
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class PlanFragmentExecutor : public TaskExecutionContext {

Status update_status(Status status);

std::string get_load_error_url();

private:
ExecEnv* _exec_env = nullptr; // not owned
ExecNode* _plan = nullptr; // lives in _runtime_state->obj_pool()
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) {
}
}

void QueryContext::set_load_error_url(std::string error_url) {
std::lock_guard<std::mutex> lock(_error_url_lock);
_load_error_url = error_url;
}

std::string QueryContext::get_load_error_url() {
std::lock_guard<std::mutex> lock(_error_url_lock);
return _load_error_url;
}

void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
set_execution_dependency_ready();
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct ReportStatusRequest {
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
std::string load_error_url;
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn;
};
Expand Down Expand Up @@ -330,6 +331,9 @@ class QueryContext {
fragment_instance_ids.push_back(ins_id);
}

void set_load_error_url(std::string error_url);
std::string get_load_error_url();

private:
std::mutex _ins_lock;
TUniqueId _query_id;
Expand Down Expand Up @@ -392,6 +396,9 @@ class QueryContext {
std::mutex _brpc_stubs_mutex;
std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;

std::mutex _error_url_lock;
std::string _load_error_url;

public:
timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
QuerySource get_query_source() const { return this->_query_source; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ suite("test_etl_failed", "load_p0") {
assertTrue(1 == 2, "etl should be failed")
break;
}
if (result[0][2].equals("CANCELLED") && result[0][13].contains("_load_error_log")) {
if (result[0][2].equals("CANCELLED")) {
def reason = result[0][7]
logger.info("load failed, ErrorMsg: $reason");
if (result[0][7] != null && result[0][7].contains("ETL_QUALITY_UNSATISFIED" ) && !result[0][13].contains("_load_error_log")) {
assertTrue(1 == 2, "etl should be failed")
break;
}
break;
}
Thread.sleep(1000)
Expand Down