Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
size_t pre_raw_read_rows = 0;
while (!_state->is_cancelled()) {
// read predicate columns
pre_read_rows = 0;
Expand All @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
DCHECK_EQ(pre_eof, true);
break;
}
pre_raw_read_rows += pre_read_rows;
RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
Expand Down Expand Up @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
Block::erase_useless_column(block, origin_column_num);

if (!pre_eof) {
if (pre_raw_read_rows >= config::doris_scanner_row_num) {
break;
}
// If continuous batches are skipped, we can cache them to skip a whole page
_cached_filtered_rows += pre_read_rows;
} else { // pre_eof
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Thread::set_thread_nice_value();
}
#endif
MonotonicStopWatch max_run_time_watch;
max_run_time_watch.start();
scanner->update_wait_worker_timer();
scanner->start_scan_cpu_timer();
Status status = Status::OK();
Expand Down Expand Up @@ -270,6 +272,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
if (max_run_time_watch.elapsed_time() >
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
BlockUPtr free_block = ctx->get_free_block(first_read);
if (free_block == nullptr) {
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool*
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
break;
}
break;
} while (true);

// Update filtered rows and unselected rows for load, reset counter.
Expand Down