diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 0b7b196d6a6adf..dc1277f721434a 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -72,8 +72,6 @@ class Pipeline : public std::enable_shared_from_this { RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } - bool can_steal() const { return _can_steal; } - void disable_task_steal() { _can_steal = false; } private: diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index de9063452ebf3e..a2583271b05fbb 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -70,8 +70,12 @@ #include "util/container_util.hpp" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/join/vnested_loop_join_node.h" +#include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_jdbc_scan_node.h" +#include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" +#include "vec/exec/scan/vmeta_scan_node.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vexchange_node.h" @@ -152,148 +156,6 @@ PipelinePtr PipelineFragmentContext::add_pipeline() { return pipeline; } -Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& request) { - if (_prepared) { - return Status::InternalError("Already prepared"); - } - _runtime_profile.reset(new RuntimeProfile("PipelineContext")); - _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); - COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); - _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); - SCOPED_TIMER(_prepare_timer); - - auto* fragment_context = this; - OpentelemetryTracer tracer = telemetry::get_noop_tracer(); - if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) { - tracer = telemetry::get_tracer(print_id(_query_id)); - } - START_AND_SCOPE_SPAN(tracer, span, "PipelineFragmentExecutor::prepare"); - - const TPlanFragmentExecParams& params = request.params; - - LOG_INFO("PipelineFragmentContext::prepare") - .tag("query_id", _query_id) - .tag("instance_id", params.fragment_instance_id) - .tag("backend_num", request.backend_num) - .tag("pthread_id", (uintptr_t)pthread_self()); - - // 1. init _runtime_state - _runtime_state = std::make_unique(params, request.query_options, - _query_ctx->query_globals, _exec_env); - _runtime_state->set_query_fragments_ctx(_query_ctx.get()); - _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); - _runtime_state->set_tracer(std::move(tracer)); - - // TODO should be combine with plan_fragment_executor.prepare funciton - SCOPED_ATTACH_TASK(get_runtime_state()); - _runtime_state->runtime_filter_mgr()->init(); - _runtime_state->set_be_number(request.backend_num); - - if (request.__isset.backend_id) { - _runtime_state->set_backend_id(request.backend_id); - } - if (request.__isset.import_label) { - _runtime_state->set_import_label(request.import_label); - } - if (request.__isset.db_name) { - _runtime_state->set_db_name(request.db_name); - } - if (request.__isset.load_job_id) { - _runtime_state->set_load_job_id(request.load_job_id); - } - - if (request.query_options.__isset.is_report_success) { - fragment_context->set_is_report_success(request.query_options.is_report_success); - } - - auto* desc_tbl = _query_ctx->desc_tbl; - _runtime_state->set_desc_tbl(desc_tbl); - - // 2. Create ExecNode to build pipeline with PipelineFragmentContext - RETURN_IF_ERROR(ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(), - request.fragment.plan, *desc_tbl, &_root_plan)); - - // Set senders of exchange nodes before pipeline build - std::vector exch_nodes; - _root_plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); - for (ExecNode* exch_node : exch_nodes) { - DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); - int num_senders = find_with_default(params.per_exch_num_senders, exch_node->id(), 0); - DCHECK_GT(num_senders, 0); - static_cast(exch_node)->set_num_senders(num_senders); - } - - // All prepare work do in exec node tree - RETURN_IF_ERROR(_root_plan->prepare(_runtime_state.get())); - // set scan ranges - std::vector scan_nodes; - std::vector no_scan_ranges; - _root_plan->collect_scan_nodes(&scan_nodes); - VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size(); - VLOG_CRITICAL << "params.per_node_scan_ranges.size()=" << params.per_node_scan_ranges.size(); - - _root_plan->try_do_aggregate_serde_improve(); - // set scan range in ScanNode - for (int i = 0; i < scan_nodes.size(); ++i) { - // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. - ExecNode* node = scan_nodes[i]; - if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || - typeid(*node) == typeid(vectorized::NewFileScanNode) // || -// typeid(*node) == typeid(vectorized::NewOdbcScanNode) || -// typeid(*node) == typeid(vectorized::NewEsScanNode) -#ifdef LIBJVM -// || typeid(*node) == typeid(vectorized::NewJdbcScanNode) -#endif - ) { - auto* scan_node = static_cast(scan_nodes[i]); - const std::vector& scan_ranges = - find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->set_scan_ranges(scan_ranges); - } else { - ScanNode* scan_node = static_cast(scan_nodes[i]); - const std::vector& scan_ranges = - find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->set_scan_ranges(scan_ranges); - VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size(); - } - } - - _runtime_state->set_per_fragment_instance_idx(params.sender_id); - _runtime_state->set_num_per_fragment_instances(params.num_senders); - - if (request.fragment.__isset.output_sink) { - RETURN_IF_ERROR(DataSink::create_data_sink( - _runtime_state->obj_pool(), request.fragment.output_sink, - request.fragment.output_exprs, params, _root_plan->row_desc(), _runtime_state.get(), - &_sink, *desc_tbl)); - } - - _root_pipeline = fragment_context->add_pipeline(); - RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); - if (_sink) { - RETURN_IF_ERROR(_create_sink(request.fragment.output_sink)); - } - RETURN_IF_ERROR(_build_pipeline_tasks(request)); - if (_sink) { - _runtime_state->runtime_profile()->add_child(_sink->profile(), true, nullptr); - } - _runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr); - _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); - - if (_is_report_success && config::status_report_interval > 0) { - std::unique_lock l(_report_thread_lock); - _exec_env->send_report_thread_pool()->submit_func([this] { - Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; - this->report_profile(); - }); - // make sure the thread started up, otherwise report_profile() might get into a race - // with stop_report_thread() - _report_thread_started_cv.wait(l); - } - _prepared = true; - return Status::OK(); -} - Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, const size_t idx) { if (_prepared) { @@ -385,23 +247,18 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. ExecNode* node = scan_nodes[i]; if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || - typeid(*node) == typeid(vectorized::NewFileScanNode) // || -// typeid(*node) == typeid(vectorized::NewOdbcScanNode) || -// typeid(*node) == typeid(vectorized::NewEsScanNode) + typeid(*node) == typeid(vectorized::NewFileScanNode) || + typeid(*node) == typeid(vectorized::NewOdbcScanNode) || + typeid(*node) == typeid(vectorized::NewEsScanNode) || + typeid(*node) == typeid(vectorized::VMetaScanNode) #ifdef LIBJVM -// || typeid(*node) == typeid(vectorized::NewJdbcScanNode) + || typeid(*node) == typeid(vectorized::NewJdbcScanNode) #endif ) { auto* scan_node = static_cast(scan_nodes[i]); const std::vector& scan_ranges = find_with_default( local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); scan_node->set_scan_ranges(scan_ranges); - } else { - ScanNode* scan_node = static_cast(scan_nodes[i]); - const std::vector& scan_ranges = find_with_default( - local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->set_scan_ranges(scan_ranges); - VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size(); } } @@ -431,30 +288,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re return Status::OK(); } -Status PipelineFragmentContext::_build_pipeline_tasks( - const doris::TExecPlanFragmentParams& request) { - for (PipelinePtr& pipeline : _pipelines) { - // if sink - auto sink = pipeline->sink()->build_operator(); - // TODO pipeline 1 need to add new interface for exec node and operator - sink->init(request.fragment.output_sink); - - Operators operators; - RETURN_IF_ERROR(pipeline->build_operators(operators)); - auto task = std::make_unique(pipeline, 0, _runtime_state.get(), operators, - sink, this, pipeline->pipeline_profile()); - sink->set_child(task->get_root()); - _tasks.emplace_back(std::move(task)); - _runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr); - } - - for (auto& task : _tasks) { - RETURN_IF_ERROR(task->prepare(_runtime_state.get())); - } - _total_tasks = _tasks.size(); - return Status::OK(); -} - Status PipelineFragmentContext::_build_pipeline_tasks( const doris::TPipelineFragmentParams& request) { for (PipelinePtr& pipeline : _pipelines) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 31520d3cac67ea..b8db2728d2d66b 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -66,8 +66,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); @@ -162,8 +155,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this _query_ctx; - // If set the true, this plan fragment will be executed only after FE send execution start rpc. - bool _need_wait_execution_trigger = false; std::shared_ptr _merge_controller_handler; MonotonicStopWatch _fragment_watcher; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index df7e7de6a50189..d846d8940d1328 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -126,9 +126,6 @@ class PipelineTask { void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } void start_schedule_watcher() { _wait_schedule_watcher.start(); } void stop_schedule_watcher() { _wait_schedule_watcher.stop(); } - - int pipeline_id() const { return _pipeline->_pipeline_id; } - PipelineTaskState get_state() { return _cur_state; } void set_state(PipelineTaskState state); @@ -173,8 +170,6 @@ class PipelineTask { bool has_dependency(); - uint32_t index() const { return _index; } - OperatorPtr get_root() { return _root; } std::string debug_string() const; diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 7fcceddcf3c71a..50284664f3ba31 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -61,7 +61,6 @@ class TaskScheduler { TaskScheduler(ExecEnv* exec_env, std::shared_ptr b_scheduler, std::shared_ptr task_queue) : _task_queue(std::move(task_queue)), - _exec_env(exec_env), _blocked_task_scheduler(std::move(b_scheduler)), _shutdown(false) {} @@ -73,13 +72,10 @@ class TaskScheduler { void shutdown(); - ExecEnv* exec_env() { return _exec_env; } - private: std::unique_ptr _fix_thread_pool; std::shared_ptr _task_queue; std::vector>> _markers; - ExecEnv* _exec_env; std::shared_ptr _blocked_task_scheduler; std::atomic _shutdown; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp old mode 100755 new mode 100644 index f683828c5d8963..c8f863f42e7b98 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -726,72 +726,37 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, } int64_t duration_ns = 0; - if (!pipeline_engine_enabled) { - { - SCOPED_RAW_TIMER(&duration_ns); - RETURN_IF_ERROR(exec_state->prepare(params)); - } - g_fragmentmgr_prepare_latency << (duration_ns / 1000); - std::shared_ptr handler; - _runtimefilter_controller.add_entity(params, &handler, - exec_state->executor()->runtime_state()); - exec_state->set_merge_controller_handler(handler); - { - std::lock_guard lock(_lock); - _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); - _cv.notify_all(); - } - auto st = _thread_pool->submit_func( - [this, exec_state, cb, - parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { - OpentelemetryScope scope {parent_span}; - _exec_actual(exec_state, cb); - }); - if (!st.ok()) { - { - // Remove the exec state added - std::lock_guard lock(_lock); - _fragment_map.erase(params.params.fragment_instance_id); - } - exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - "push plan fragment to thread pool failed"); - return Status::InternalError(strings::Substitute( - "push plan fragment $0 to thread pool failed. err = $1, BE: $2", - print_id(params.params.fragment_instance_id), st.to_string(), - BackendOptions::get_localhost())); - } - } else { - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - fragments_ctx->set_ready_to_execute_only(); - } - _setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(), - fragments_ctx.get()); - std::shared_ptr context = - std::make_shared( - fragments_ctx->query_id, fragment_instance_id, -1, params.backend_num, - fragments_ctx, _exec_env, cb, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1)); - { - SCOPED_RAW_TIMER(&duration_ns); - auto prepare_st = context->prepare(params); - if (!prepare_st.ok()) { - context->close_if_prepare_failed(); - return prepare_st; - } - } - g_fragmentmgr_prepare_latency << (duration_ns / 1000); - - std::shared_ptr handler; - _runtimefilter_controller.add_entity(params, &handler, context->get_runtime_state()); - context->set_merge_controller_handler(handler); - + DCHECK(!pipeline_engine_enabled); + { + SCOPED_RAW_TIMER(&duration_ns); + RETURN_IF_ERROR(exec_state->prepare(params)); + } + g_fragmentmgr_prepare_latency << (duration_ns / 1000); + std::shared_ptr handler; + _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); + exec_state->set_merge_controller_handler(handler); + { + std::lock_guard lock(_lock); + _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); + _cv.notify_all(); + } + auto st = _thread_pool->submit_func( + [this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { + OpentelemetryScope scope {parent_span}; + _exec_actual(exec_state, cb); + }); + if (!st.ok()) { { + // Remove the exec state added std::lock_guard lock(_lock); - _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); - _cv.notify_all(); + _fragment_map.erase(params.params.fragment_instance_id); } - return context->submit(); + exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "push plan fragment to thread pool failed"); + return Status::InternalError( + strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2", + print_id(params.params.fragment_instance_id), st.to_string(), + BackendOptions::get_localhost())); } return Status::OK(); diff --git a/regression-test/data/query_p0/empty_table/sql/empty_set_node.out b/regression-test/data/query_p0/empty_table/sql/empty_set_node.out new file mode 100644 index 00000000000000..9f80652bb009be --- /dev/null +++ b/regression-test/data/query_p0/empty_table/sql/empty_set_node.out @@ -0,0 +1,3 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !empty_set_node -- + diff --git a/regression-test/data/query_p0/sql_functions/hash_functions/test_hash_function.out b/regression-test/data/query_p0/sql_functions/hash_functions/test_hash_function.out index c7b9485d454b65..221936613d31bf 100644 --- a/regression-test/data/query_p0/sql_functions/hash_functions/test_hash_function.out +++ b/regression-test/data/query_p0/sql_functions/hash_functions/test_hash_function.out @@ -16,3 +16,4 @@ -- !sql -- 3583109472027628045 + diff --git a/regression-test/suites/query_p0/empty_table/sql/empty_set_node.sql b/regression-test/suites/query_p0/empty_table/sql/empty_set_node.sql new file mode 100644 index 00000000000000..4273020b6ffb87 --- /dev/null +++ b/regression-test/suites/query_p0/empty_table/sql/empty_set_node.sql @@ -0,0 +1 @@ +select * from empty where 1 = 2 diff --git a/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy b/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy index 9e8416cce56190..4c7b925a8a0bc4 100644 --- a/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_hash_function") { sql "set batch_size = 4096;" + sql "set enable_profile = true;" qt_sql "SELECT murmur_hash3_32(null);" qt_sql "SELECT murmur_hash3_32(\"hello\");"