Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }

bool can_steal() const { return _can_steal; }

void disable_task_steal() { _can_steal = false; }

private:
Expand Down
185 changes: 9 additions & 176 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@
#include "util/container_util.hpp"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/join/vnested_loop_join_node.h"
#include "vec/exec/scan/new_es_scan_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/scan/vmeta_scan_node.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vexchange_node.h"
Expand Down Expand Up @@ -152,148 +156,6 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
return pipeline;
}

Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& request) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_start_timer = ADD_TIMER(_runtime_profile, "StartTime");
COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);

auto* fragment_context = this;
OpentelemetryTracer tracer = telemetry::get_noop_tracer();
if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
tracer = telemetry::get_tracer(print_id(_query_id));
}
START_AND_SCOPE_SPAN(tracer, span, "PipelineFragmentExecutor::prepare");

const TPlanFragmentExecParams& params = request.params;

LOG_INFO("PipelineFragmentContext::prepare")
.tag("query_id", _query_id)
.tag("instance_id", params.fragment_instance_id)
.tag("backend_num", request.backend_num)
.tag("pthread_id", (uintptr_t)pthread_self());

// 1. init _runtime_state
_runtime_state = std::make_unique<RuntimeState>(params, request.query_options,
_query_ctx->query_globals, _exec_env);
_runtime_state->set_query_fragments_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));

// TODO should be combine with plan_fragment_executor.prepare funciton
SCOPED_ATTACH_TASK(get_runtime_state());
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(request.backend_num);

if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}

if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
}

auto* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);

// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR(ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(),
request.fragment.plan, *desc_tbl, &_root_plan));

// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
_root_plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
for (ExecNode* exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
int num_senders = find_with_default(params.per_exch_num_senders, exch_node->id(), 0);
DCHECK_GT(num_senders, 0);
static_cast<vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders);
}

// All prepare work do in exec node tree
RETURN_IF_ERROR(_root_plan->prepare(_runtime_state.get()));
// set scan ranges
std::vector<ExecNode*> scan_nodes;
std::vector<TScanRangeParams> no_scan_ranges;
_root_plan->collect_scan_nodes(&scan_nodes);
VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size();
VLOG_CRITICAL << "params.per_node_scan_ranges.size()=" << params.per_node_scan_ranges.size();

_root_plan->try_do_aggregate_serde_improve();
// set scan range in ScanNode
for (int i = 0; i < scan_nodes.size(); ++i) {
// TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode.
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) // ||
// typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
// typeid(*node) == typeid(vectorized::NewEsScanNode)
#ifdef LIBJVM
// || typeid(*node) == typeid(vectorized::NewJdbcScanNode)
#endif
) {
auto* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->set_scan_ranges(scan_ranges);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->set_scan_ranges(scan_ranges);
VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size();
}
}

_runtime_state->set_per_fragment_instance_idx(params.sender_id);
_runtime_state->set_num_per_fragment_instances(params.num_senders);

if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, params, _root_plan->row_desc(), _runtime_state.get(),
&_sink, *desc_tbl));
}

_root_pipeline = fragment_context->add_pipeline();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
RETURN_IF_ERROR(_create_sink(request.fragment.output_sink));
}
RETURN_IF_ERROR(_build_pipeline_tasks(request));
if (_sink) {
_runtime_state->runtime_profile()->add_child(_sink->profile(), true, nullptr);
}
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr);
_runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr);

if (_is_report_success && config::status_report_interval > 0) {
std::unique_lock<std::mutex> l(_report_thread_lock);
_exec_env->send_report_thread_pool()->submit_func([this] {
Defer defer {[&]() { this->_report_thread_promise.set_value(true); }};
this->report_profile();
});
// make sure the thread started up, otherwise report_profile() might get into a race
// with stop_report_thread()
_report_thread_started_cv.wait(l);
}
_prepared = true;
return Status::OK();
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
const size_t idx) {
if (_prepared) {
Expand Down Expand Up @@ -385,23 +247,18 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
// TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode.
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) // ||
// typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
// typeid(*node) == typeid(vectorized::NewEsScanNode)
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
typeid(*node) == typeid(vectorized::NewEsScanNode) ||
typeid(*node) == typeid(vectorized::VMetaScanNode)
#ifdef LIBJVM
// || typeid(*node) == typeid(vectorized::NewJdbcScanNode)
|| typeid(*node) == typeid(vectorized::NewJdbcScanNode)
#endif
) {
auto* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges = find_with_default(
local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->set_scan_ranges(scan_ranges);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges = find_with_default(
local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->set_scan_ranges(scan_ranges);
VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size();
}
}

Expand Down Expand Up @@ -431,30 +288,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
return Status::OK();
}

Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TExecPlanFragmentParams& request) {
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
// TODO pipeline 1 need to add new interface for exec node and operator
sink->init(request.fragment.output_sink);

Operators operators;
RETURN_IF_ERROR(pipeline->build_operators(operators));
auto task = std::make_unique<PipelineTask>(pipeline, 0, _runtime_state.get(), operators,
sink, this, pipeline->pipeline_profile());
sink->set_child(task->get_root());
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
}

for (auto& task : _tasks) {
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
}
_total_tasks = _tasks.size();
return Status::OK();
}

Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
for (PipelinePtr& pipeline : _pipelines) {
Expand Down
9 changes: 0 additions & 9 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

int32_t next_operator_builder_id() { return _next_operator_builder_id++; }

Status prepare(const doris::TExecPlanFragmentParams& request);

Status prepare(const doris::TPipelineFragmentParams& request, const size_t idx);

Status submit();
Expand All @@ -76,10 +74,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }

ExecNode*& plan() { return _root_plan; }

void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }

void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR,
const std::string& msg = "");

Expand Down Expand Up @@ -115,7 +109,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
private:
Status _create_sink(const TDataSink& t_data_sink);
Status _build_pipelines(ExecNode*, PipelinePtr);
Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& request);
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
Expand Down Expand Up @@ -162,8 +155,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

std::shared_ptr<QueryFragmentsCtx> _query_ctx;

// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;

MonotonicStopWatch _fragment_watcher;
Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ class PipelineTask {
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }

int pipeline_id() const { return _pipeline->_pipeline_id; }

PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);

Expand Down Expand Up @@ -173,8 +170,6 @@ class PipelineTask {

bool has_dependency();

uint32_t index() const { return _index; }

OperatorPtr get_root() { return _root; }

std::string debug_string() const;
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class TaskScheduler {
TaskScheduler(ExecEnv* exec_env, std::shared_ptr<BlockedTaskScheduler> b_scheduler,
std::shared_ptr<TaskQueue> task_queue)
: _task_queue(std::move(task_queue)),
_exec_env(exec_env),
_blocked_task_scheduler(std::move(b_scheduler)),
_shutdown(false) {}

Expand All @@ -73,13 +72,10 @@ class TaskScheduler {

void shutdown();

ExecEnv* exec_env() { return _exec_env; }

private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
std::vector<std::unique_ptr<std::atomic<bool>>> _markers;
ExecEnv* _exec_env;
std::shared_ptr<BlockedTaskScheduler> _blocked_task_scheduler;
std::atomic<bool> _shutdown;

Expand Down
Loading