From c13444675247d439e8bc90e0a75b6000cda5608b Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 11 Sep 2024 19:48:43 +0800 Subject: [PATCH 1/3] [fix](pipelinex) fix fragment instance progress reports (part 2) --- be/src/runtime/fragment_mgr.cpp | 18 +++++++++++++++++- .../java/org/apache/doris/qe/Coordinator.java | 6 +++--- gensrc/thrift/FrontendService.thrift | 2 ++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 4af6b72a220c78..f0c3a67a645d19 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -329,6 +329,18 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { // this is a load plan, and load is not finished, just make a brief report params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); + for (auto* rs : req.runtime_states) { + if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || + rs->num_finished_range() > 0) { + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(rs->fragment_instance_id()); + t.__set_num_finished_range(rs->num_finished_range()); + t.__set_loaded_rows(rs->num_rows_load_total()); + t.__set_loaded_bytes(rs->num_bytes_load_total()); + params.fragment_instance_reports.push_back(t); + } + } } else { if (req.runtime_state->query_type() == TQueryType::LOAD) { params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); @@ -405,11 +417,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { TFragmentInstanceReport t; t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); t.__set_num_finished_range(req.runtime_state->num_finished_range()); + t.__set_loaded_rows(req.runtime_state->num_rows_load_total()); + t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); params.fragment_instance_reports.push_back(t); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || - req.runtime_state->num_finished_range() > 0) { + rs->num_finished_range() > 0) { params.__isset.load_counters = true; num_rows_load_success += rs->num_rows_load_success(); num_rows_load_filtered += rs->num_rows_load_filtered(); @@ -418,6 +432,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { TFragmentInstanceReport t; t.__set_fragment_instance_id(rs->fragment_instance_id()); t.__set_num_finished_range(rs->num_finished_range()); + t.__set_loaded_rows(rs->num_rows_load_total()); + t.__set_loaded_bytes(rs->num_bytes_load_total()); params.fragment_instance_reports.push_back(t); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5e3a59d9a54d96..a7472191488e01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2464,7 +2464,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) { Env.getCurrentEnv().getLoadManager().updateJobProgress( jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + report.getLoadedRows(), report.getLoadedBytes(), params.isDone()); Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange()); } @@ -2888,9 +2888,9 @@ public void unsetFields() { // Has to use synchronized to ensure there are not concurrent update threads. Or the done // state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files. public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) { - // The fragment or instance is not finished, not need update + // The fragment or instance is not finished, still need update progress if (!params.done) { - return false; + return true; } if (this.done) { // duplicate packet diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9077dbd3cec2c8..330b786dacca20 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -437,6 +437,8 @@ struct TQueryProfile { struct TFragmentInstanceReport { 1: optional Types.TUniqueId fragment_instance_id; 2: optional i32 num_finished_range; + 3: optional i64 loaded_rows + 4: optional i64 loaded_bytes } From 0e4fe7a34224262315c5e43111c3e79ef7a50734 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 11 Sep 2024 20:02:05 +0800 Subject: [PATCH 2/3] update comments --- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a7472191488e01..20e17fe54778f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2888,7 +2888,8 @@ public void unsetFields() { // Has to use synchronized to ensure there are not concurrent update threads. Or the done // state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files. public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) { - // The fragment or instance is not finished, still need update progress + // The fragment or instance is not finished, still need handle the report to update the load progress. + // Otherwise the load progress (ScannedRows/LoadBytes) won't be updated before any instance is done. if (!params.done) { return true; } From af86c63d56915c2a013230b95c7d4170d49338f4 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Sat, 14 Sep 2024 17:06:01 +0800 Subject: [PATCH 3/3] revert the non-done progress report fix --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 20e17fe54778f8..97a190a554d60f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2888,10 +2888,9 @@ public void unsetFields() { // Has to use synchronized to ensure there are not concurrent update threads. Or the done // state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files. public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) { - // The fragment or instance is not finished, still need handle the report to update the load progress. - // Otherwise the load progress (ScannedRows/LoadBytes) won't be updated before any instance is done. + // The fragment or instance is not finished, not need update if (!params.done) { - return true; + return false; } if (this.done) { // duplicate packet