Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator());
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator());
return Status::OK();
}

Expand Down
16 changes: 11 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution)
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution,
bool is_file_scan_operator)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
Expand All @@ -58,7 +59,8 @@ ScannerContext::ScannerContext(
limit(limit_),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution) {
_ignore_data_distribution(ignore_data_distribution),
_is_file_scan_operator(is_file_scan_operator) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
Expand Down Expand Up @@ -143,7 +145,10 @@ Status ScannerContext::init() {
}

// _scannner_scheduler will be used to submit scan task.
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) {
// file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler.
// we should fix this problem in the future.
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size ||
_is_file_scan_operator) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}

Expand All @@ -166,8 +171,9 @@ Status ScannerContext::init() {
if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
_max_thread_num =
4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances);
const size_t factor = _is_file_scan_operator ? 1 : 4;
_max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution);
bool ignore_data_distribution, bool is_file_scan_operator);

~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
Expand Down Expand Up @@ -214,6 +214,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;
bool _is_file_scan_operator = false;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
Expand Down