Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.ignore_data_distribution());
_scan_dependency, p.ignore_data_distribution(), p.is_file_scan_operator());
return Status::OK();
}

Expand Down
21 changes: 17 additions & 4 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,20 @@ class PipScannerContext final : public vectorized::ScannerContext {
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution)
: vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor,
scanners, limit_, ignore_data_distribution) {}
: vectorized::ScannerContext(
state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_,
ignore_data_distribution,
/*non-pipeine & old pipeine does not process file scan operator seperatyly*/
/*they use state->query_parallel_instance_num() as num_parallel_instances, see:
_max_thread_num = _state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num /
(_local_state ? num_parallel_instances
: state->query_parallel_instance_num());
*/
// so we set is_file_scan_operator to true
// so that _max_thread_num will be same like before for engine except for pipelineX
true) {}
};

class PipXScannerContext final : public vectorized::ScannerContext {
Expand All @@ -45,9 +57,10 @@ class PipXScannerContext final : public vectorized::ScannerContext {
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution)
bool ignore_data_distribution, bool is_file_scan_operator)
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, ignore_data_distribution, local_state) {
limit_, ignore_data_distribution, is_file_scan_operator,
local_state) {
_dependency = dependency;
}

Expand Down
16 changes: 11 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution,
bool is_file_scan_operator,
pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
Expand All @@ -56,7 +57,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
limit(limit_),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution) {
_ignore_data_distribution(ignore_data_distribution),
_is_file_scan_operator(is_file_scan_operator) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
Expand All @@ -77,9 +79,10 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution,
bool is_file_scan_operator,
pipeline::ScanLocalStateBase* local_state)
: ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_,
ignore_data_distribution, local_state) {
ignore_data_distribution, is_file_scan_operator, local_state) {
_parent = parent;

// No need to increase scanner_ctx_cnt here. Since other constructor has already done it.
Expand Down Expand Up @@ -160,7 +163,8 @@ Status ScannerContext::init() {
}

// _scannner_scheduler will be used to submit scan task.
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) {
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size ||
_is_file_scan_operator) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}

Expand All @@ -183,8 +187,10 @@ Status ScannerContext::init() {
if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
_max_thread_num =
4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances);
const int scale_arg = _is_file_scan_operator ? 1 : 4;
_max_thread_num = scale_arg * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);

// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_,
bool ignore_data_distribution,
bool ignore_data_distribution, bool is_file_scan_operator,
pipeline::ScanLocalStateBase* local_state = nullptr);

~ScannerContext() override {
Expand Down Expand Up @@ -183,7 +183,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners_, int64_t limit_,
bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state);
bool ignore_data_distribution, bool is_file_scan_operator,
pipeline::ScanLocalStateBase* local_state);

/// Four criteria to determine whether to increase the parallelism of the scanners
/// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
Expand Down Expand Up @@ -234,6 +235,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
bool _ignore_data_distribution = false;
bool _is_file_scan_operator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should set default value


// for scaling up the running scanners
size_t _estimated_block_size = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void VScanNode::_start_scanners(const std::list<std::shared_ptr<ScannerDelegate>
} else {
_scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc,
_output_row_descriptor.get(), scanners,
limit(), false);
limit(), false, true);
}
}

Expand Down