diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 61c0ecdea2f823..ae04579c5260ec 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -508,7 +508,7 @@ DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1"); DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); // The maximum amount of data that can be processed by a stream load -DEFINE_mInt64(streaming_load_max_mb, "10240"); +DEFINE_mInt64(streaming_load_max_mb, "102400"); // Some data formats, such as JSON, cannot be streamed. // Therefore, it is necessary to limit the maximum number of // such data when using stream load to prevent excessive memory consumption. diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 102846cbcf47e1..1c0cdffc0f56f8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1339,6 +1339,7 @@ void IRuntimeFilter::signal() { } void IRuntimeFilter::set_filter_timer(std::shared_ptr timer) { + std::unique_lock lock(_inner_mutex); _filter_timer.push_back(timer); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index ab5b7e36bc2ad5..4693f8343fde88 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -156,7 +156,6 @@ class Pipeline : public std::enable_shared_from_this { void set_children(std::vector> children) { _children = children; } void incr_created_tasks() { _num_tasks_created++; } - bool need_to_create_task() const { return _num_tasks > _num_tasks_created; } void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; for (auto& op : operatorXs) { @@ -243,7 +242,7 @@ class Pipeline : public std::enable_shared_from_this { // How many tasks should be created ? int _num_tasks = 1; // How many tasks are already created? - int _num_tasks_created = 0; + std::atomic _num_tasks_created = 0; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index b8d192ac096c51..f60cb80359ab32 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -89,7 +89,7 @@ class PipelineFragmentContext : public TaskExecutionContext { Status prepare(const doris::TPipelineFragmentParams& request, size_t idx); - virtual Status prepare(const doris::TPipelineFragmentParams& request) { + virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) { return Status::InternalError("Pipeline fragment context do not implement prepare"); } @@ -168,7 +168,7 @@ class PipelineFragmentContext : public TaskExecutionContext { int _closed_tasks = 0; // After prepared, `_total_tasks` is equal to the size of `_tasks`. // When submit fail, `_total_tasks` is equal to the number of tasks submitted. - int _total_tasks = 0; + std::atomic _total_tasks = 0; int32_t _next_operator_builder_id = 10000; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index d9a1d9368006c3..ac527ed8e69888 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -116,9 +116,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { auto st = _query_ctx->exec_status(); _tasks.clear(); if (!_task_runtime_states.empty()) { - for (auto& runtime_state : _task_runtime_states) { - _call_back(runtime_state.get(), &st); - runtime_state.reset(); + for (auto& runtime_states : _task_runtime_states) { + for (auto& runtime_state : runtime_states) { + if (runtime_state) { + _call_back(runtime_state.get(), &st); + runtime_state.reset(); + } + } } } else { _call_back(nullptr, &st); @@ -182,7 +186,8 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } } -Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { +Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request, + ThreadPool* thread_pool) { if (_prepared) { return Status::InternalError("Already prepared"); } @@ -210,7 +215,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); @@ -284,7 +288,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } // 5. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(request)); + RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool)); _init_next_report_time(); @@ -511,11 +515,17 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData return Status::OK(); } -Status PipelineXFragmentContext::_build_pipeline_tasks( - const doris::TPipelineFragmentParams& request) { +Status PipelineXFragmentContext::_build_pipeline_x_tasks( + const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) { _total_tasks = 0; int target_size = request.local_params.size(); _tasks.resize(target_size); + _fragment_instance_ids.resize(target_size); + _runtime_filter_states.resize(target_size); + _task_runtime_states.resize(_pipelines.size()); + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { + _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + } auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); DCHECK(pipeline_id_to_profile.empty()); pipeline_id_to_profile.resize(_pipelines.size()); @@ -528,10 +538,10 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } } - for (size_t i = 0; i < target_size; i++) { + auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) { const auto& local_params = request.local_params[i]; auto fragment_instance_id = local_params.fragment_instance_id; - _fragment_instance_ids.push_back(fragment_instance_id); + _fragment_instance_ids[i] = fragment_instance_id; std::unique_ptr runtime_filter_mgr; auto init_runtime_state = [&](std::unique_ptr& runtime_state) { runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); @@ -588,7 +598,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); - _runtime_filter_states.push_back(std::move(filterparams)); + _runtime_filter_states[i] = std::move(filterparams); std::map pipeline_id_to_task; auto get_local_exchange_state = [&](PipelinePtr pipeline) -> std::map, @@ -608,32 +618,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } return le_state_map; }; - auto get_task_runtime_state = [&](int task_id) -> RuntimeState* { - DCHECK(_task_runtime_states[task_id]); - return _task_runtime_states[task_id].get(); - }; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto& pipeline = _pipelines[pip_idx]; - if (pipeline->need_to_create_task()) { + if (pipeline->num_tasks() > 1 || i == 0) { + auto cur_task_id = _total_tasks++; + DCHECK(_task_runtime_states[pip_idx][i] == nullptr) + << print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " " + << pipeline->debug_string(); // build task runtime state - _task_runtime_states.push_back(RuntimeState::create_unique( + _task_runtime_states[pip_idx][i] = RuntimeState::create_unique( this, local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, - _exec_env, _query_ctx.get())); - auto& task_runtime_state = _task_runtime_states.back(); + _exec_env, _query_ctx.get()); + auto& task_runtime_state = _task_runtime_states[pip_idx][i]; init_runtime_state(task_runtime_state); - auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); task_runtime_state->set_task_num(pipeline->num_tasks()); - auto task = std::make_unique( - pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, - pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), - i); + auto task = std::make_unique(pipeline, cur_task_id, + task_runtime_state.get(), ctx, + pipeline_id_to_profile[pip_idx].get(), + get_local_exchange_state(pipeline), i); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } } - /** * Build DAG for pipeline tasks. * For example, we have @@ -693,6 +701,40 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( std::lock_guard l(_state_map_lock); _runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr); } + return Status::OK(); + }; + if (target_size > 1 && + (_runtime_state->query_options().__isset.parallel_prepare_threshold && + target_size > _runtime_state->query_options().parallel_prepare_threshold)) { + Status prepare_status[target_size]; + std::mutex m; + std::condition_variable cv; + int prepare_done = 0; + for (size_t i = 0; i < target_size; i++) { + RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { + SCOPED_ATTACH_TASK(_query_ctx.get()); + prepare_status[i] = pre_and_submit(i, this); + std::unique_lock lock(m); + prepare_done++; + if (prepare_done == target_size) { + cv.notify_one(); + } + })); + } + std::unique_lock lock(m); + if (prepare_done != target_size) { + cv.wait(lock); + + for (size_t i = 0; i < target_size; i++) { + if (!prepare_status[i].ok()) { + return prepare_status[i]; + } + } + } + } else { + for (size_t i = 0; i < target_size; i++) { + RETURN_IF_ERROR(pre_and_submit(i, this)); + } } _pipeline_parent_map.clear(); _dag.clear(); @@ -1512,8 +1554,12 @@ Status PipelineXFragmentContext::send_report(bool done) { std::vector runtime_states; - for (auto& task_state : _task_runtime_states) { - runtime_states.push_back(task_state.get()); + for (auto& task_states : _task_runtime_states) { + for (auto& task_state : task_states) { + if (task_state) { + runtime_states.push_back(task_state.get()); + } + } } return _report_status_cb( {true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(), diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index a695f73d28333a..8a09f64ac6b902 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -94,7 +94,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { } // Prepare global information including global states and the unique operator tree shared by all pipeline tasks. - Status prepare(const doris::TPipelineFragmentParams& request) override; + Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) override; Status submit() override; @@ -118,7 +118,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { private: void _close_fragment_instance() override; - Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; + Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request, + ThreadPool* thread_pool); Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, @@ -230,7 +231,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { std::vector _fragment_instance_ids; // Local runtime states for each task - std::vector> _task_runtime_states; + std::vector>> _task_runtime_states; std::vector> _runtime_filter_states; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 57f0e51a2b728f..5617861a719e08 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -962,7 +962,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::placeholders::_1, std::placeholders::_2)); { SCOPED_RAW_TIMER(&duration_ns); - auto prepare_st = context->prepare(params); + auto prepare_st = context->prepare(params, _thread_pool.get()); if (!prepare_st.ok()) { context->close_if_prepare_failed(prepare_st); query_ctx->set_execution_dependency_ready(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ca731b48b37478..44124ea7954f5a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -126,9 +126,6 @@ Status Channel::init_stub(RuntimeState* state) { _is_local &= state->query_options().enable_local_exchange; } if (_is_local) { - WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr( - _fragment_instance_id, _dest_node_id, &_local_recvr), - ""); return Status::OK(); } if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { @@ -149,6 +146,11 @@ Status Channel::init_stub(RuntimeState* state) { template Status Channel::open(RuntimeState* state) { + if (_is_local) { + WARN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr( + _fragment_instance_id, _dest_node_id, &_local_recvr), + ""); + } _be_number = state->be_number(); _brpc_request = std::make_shared(); // initialize brpc request diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d6fb1617fe7d28..9b970ad80d49d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -279,6 +279,8 @@ public class SessionVariable implements Serializable, Writable { public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold"; + public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold"; + public static final String ENABLE_PROJECTION = "enable_projection"; public static final String CHECK_OVERFLOW_FOR_DECIMAL = "check_overflow_for_decimal"; @@ -1004,7 +1006,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private long parallelScanMinRowsPerScanner = 16384; // 16K + private long parallelScanMinRowsPerScanner = 2097152; // 16K @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) @@ -1044,6 +1046,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD) public double autoBroadcastJoinThreshold = 0.8; + @VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD) + public int parallelPrepareThreshold = 32; + @VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER) private boolean enableJoinReorderBasedCost = false; @@ -3426,6 +3431,7 @@ public TQueryOptions toThrift() { tResult.setNumScannerThreads(numScannerThreads); tResult.setScannerScaleUpRatio(scannerScaleUpRatio); tResult.setMaxColumnReaderNum(maxColumnReaderNum); + tResult.setParallelPrepareThreshold(parallelPrepareThreshold); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4e35d6658d09cc..aae04ff3b3904f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -329,6 +329,8 @@ struct TQueryOptions { 130: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true; 131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000; + // only in 2.1 + 999: optional i32 parallel_prepare_threshold = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false }