diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2696400ca1db17..c621738c1a3ce1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -273,6 +273,8 @@ DEFINE_mInt32(doris_scan_block_max_mb, "67108864"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +// single read execute fragment max run time millseconds +DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c9801275424afd..0b5a282f26fbf0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -323,6 +323,8 @@ DECLARE_mInt32(doris_scan_block_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +// single read execute fragment max run time millseconds +DECLARE_mInt32(doris_scanner_max_run_time_ms); DECLARE_mInt32(min_bytes_in_scanner_queue); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 576ed58a284f8f..b601064daeb1e4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; + size_t pre_raw_read_rows = 0; while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re DCHECK_EQ(pre_eof, true); break; } + pre_raw_read_rows += pre_read_rows; RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re Block::erase_useless_column(block, origin_column_num); if (!pre_eof) { + if (pre_raw_read_rows >= config::doris_scanner_row_num) { + break; + } // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; } else { // pre_eof diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7f868fba5a666e..f3f91ec6f2da0b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -232,6 +232,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Thread::set_thread_nice_value(); } #endif + MonotonicStopWatch max_run_time_watch; + max_run_time_watch.start(); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); Status status = Status::OK(); @@ -266,6 +268,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, eos = true; break; } + if (max_run_time_watch.elapsed_time() > + config::doris_scanner_max_run_time_ms * 1e6) { + break; + } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { break; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 899c618dcc5045..52375e2e9c011a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // or not found in the file column schema. RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - break; } + break; } while (true); // Update filtered rows and unselected rows for load, reset counter.