From 0c27e0bd513a5eb08184f1a0403ec0947c8707b3 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Thu, 26 Sep 2024 22:48:54 +0800 Subject: [PATCH] [Opt](scanner-scheduler) Opt scanner scheduler starvation issue. (#40641) When a scanner scheduler is stuck in executing a scan task, other scan tasks will starve and have no chance to execute, which will affect other queries. Currently, the scan task hopes to scan as much data as possible to reduce the overhead of scheduling switching. Currently, it hopes to obtain up to 10MB of data in `doris_scanner_row_bytes`. However, if a query scans a table with many rows of data, but the filtering rate is very high, the filter will eventually filter out a lot of data and will never get 10MB of data. It will keep getting and executing expression filtering, which will cause other scan tasks to starve. The current solution is to check `max_run_time_ms` by `MonotonicStopWatch`. After executing for a maximum of 1s, it will yield self's task for other tasks. When the scan task executes some time-consuming tasks, it needs to slice to do it. --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 5 +++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 2 +- 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2696400ca1db17..c621738c1a3ce1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -273,6 +273,8 @@ DEFINE_mInt32(doris_scan_block_max_mb, "67108864"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +// single read execute fragment max run time millseconds +DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c9801275424afd..0b5a282f26fbf0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -323,6 +323,8 @@ DECLARE_mInt32(doris_scan_block_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +// single read execute fragment max run time millseconds +DECLARE_mInt32(doris_scanner_max_run_time_ms); DECLARE_mInt32(min_bytes_in_scanner_queue); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 576ed58a284f8f..b601064daeb1e4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; + size_t pre_raw_read_rows = 0; while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re DCHECK_EQ(pre_eof, true); break; } + pre_raw_read_rows += pre_read_rows; RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re Block::erase_useless_column(block, origin_column_num); if (!pre_eof) { + if (pre_raw_read_rows >= config::doris_scanner_row_num) { + break; + } // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; } else { // pre_eof diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7f868fba5a666e..f3f91ec6f2da0b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -232,6 +232,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Thread::set_thread_nice_value(); } #endif + MonotonicStopWatch max_run_time_watch; + max_run_time_watch.start(); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); Status status = Status::OK(); @@ -266,6 +268,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, eos = true; break; } + if (max_run_time_watch.elapsed_time() > + config::doris_scanner_max_run_time_ms * 1e6) { + break; + } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { break; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 899c618dcc5045..52375e2e9c011a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // or not found in the file column schema. RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - break; } + break; } while (true); // Update filtered rows and unselected rows for load, reset counter.