Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
DEFINE_mBool(enable_bthread_transmit_block, "true");

// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "10240");
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ void IRuntimeFilter::signal() {
}

void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) {
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
bool need_to_create_task() const { return _num_tasks > _num_tasks_created; }
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : operatorXs) {
Expand Down Expand Up @@ -160,7 +159,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
int _num_tasks_created = 0;
std::atomic<int> _num_tasks_created = 0;
};

} // namespace doris::pipeline
98 changes: 74 additions & 24 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
for (auto& runtime_state : _task_runtime_states) {
runtime_state.reset();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
}
}
_pipelines.clear();
_sink.reset();
Expand Down Expand Up @@ -229,7 +231,8 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
return pipeline;
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request) {
Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size]

                                    ^
Additional context

be/src/pipeline/pipeline_fragment_context.cpp:233: 124 lines including whitespace and comments (threshold 80)

                                    ^

ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand Down Expand Up @@ -346,7 +349,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request));
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
}

_init_next_report_time();
Expand All @@ -355,17 +358,23 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
return Status::OK();
}

Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_tasks' has cognitive complexity of 56 (threshold 50) [readability-function-cognitive-complexity]

                                    ^
Additional context

be/src/pipeline/pipeline_fragment_context.cpp:373: nesting level increased to 1

);
                              ^

be/src/pipeline/pipeline_fragment_context.cpp:378: nesting level increased to 2

gr;
                                      ^

be/src/pipeline/pipeline_fragment_context.cpp:384: +3, including nesting penalty of 2, nesting level increased to 3

);
                ^

be/src/pipeline/pipeline_fragment_context.cpp:387: +3, including nesting penalty of 2, nesting level increased to 3

  }
                ^

be/src/pipeline/pipeline_fragment_context.cpp:390: +3, including nesting penalty of 2, nesting level increased to 3

  }
                ^

be/src/pipeline/pipeline_fragment_context.cpp:393: +3, including nesting penalty of 2, nesting level increased to 3

  }
                ^

be/src/pipeline/pipeline_fragment_context.cpp:396: +3, including nesting penalty of 2, nesting level increased to 3

  }
                ^

be/src/pipeline/pipeline_fragment_context.cpp:501: nesting level increased to 2

te
                                                  ^

be/src/pipeline/pipeline_fragment_context.cpp:503: +3, including nesting penalty of 2, nesting level increased to 3

]);
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_fragment_context.cpp:503: +4, including nesting penalty of 3, nesting level increased to 4

]);
                ^

be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_fragment_context.cpp:542: +1, including nesting penalty of 0, nesting level increased to 1

 };
        ^

be/src/pipeline/pipeline_fragment_context.cpp:549: +2, including nesting penalty of 1, nesting level increased to 2

 0;
            ^

be/src/pipeline/pipeline_fragment_context.cpp:550: +3, including nesting penalty of 2, nesting level increased to 3

) {
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_fragment_context.cpp:550: nesting level increased to 4

) {
                                                         ^

be/src/pipeline/pipeline_fragment_context.cpp:555: +5, including nesting penalty of 4, nesting level increased to 5

++;
                    ^

be/src/pipeline/pipeline_fragment_context.cpp:550: +4, including nesting penalty of 3, nesting level increased to 4

) {
                ^

be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_fragment_context.cpp:561: +2, including nesting penalty of 1, nesting level increased to 2

m);
            ^

be/src/pipeline/pipeline_fragment_context.cpp:563: +3, including nesting penalty of 2, nesting level increased to 3

k);
                ^

be/src/pipeline/pipeline_fragment_context.cpp:564: +4, including nesting penalty of 3, nesting level increased to 4

) {
                    ^

be/src/pipeline/pipeline_fragment_context.cpp:569: +1, nesting level increased to 1

  }
          ^

be/src/pipeline/pipeline_fragment_context.cpp:570: +2, including nesting penalty of 1, nesting level increased to 2

e {
            ^

be/src/pipeline/pipeline_fragment_context.cpp:571: +3, including nesting penalty of 2, nesting level increased to 3

) {
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_fragment_context.cpp:571: +4, including nesting penalty of 3, nesting level increased to 4

) {
                ^

be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function '_build_pipeline_tasks' exceeds recommended size/complexity thresholds [readability-function-size]

                                    ^
Additional context

be/src/pipeline/pipeline_fragment_context.cpp:360: 218 lines including whitespace and comments (threshold 80)

                                    ^

ThreadPool* thread_pool) {
_total_tasks = 0;
int target_size = request.local_params.size();
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
_fragment_instance_ids.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
}
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());

for (size_t i = 0; i < target_size; i++) {
auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids.push_back(fragment_instance_id);
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
Expand Down Expand Up @@ -424,7 +433,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(

filterparams->runtime_filter_mgr = runtime_filter_mgr.get();

_runtime_filter_states.push_back(std::move(filterparams));
_runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
Expand All @@ -447,13 +456,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(

for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->need_to_create_task()) {
// build task runtime state
_task_runtime_states.push_back(RuntimeState::create_unique(
if (pipeline->num_tasks() > 1 || i == 0) {
DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
<< print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
<< pipeline->debug_string();
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
Expand Down Expand Up @@ -527,6 +538,39 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
std::vector<Status> prepare_status(target_size);
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);
for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
}
} else {
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(pre_and_submit(i, this));
}
}
_pipeline_parent_map.clear();
_dag.clear();
Expand Down Expand Up @@ -1683,8 +1727,12 @@ Status PipelineFragmentContext::send_report(bool done) {

std::vector<RuntimeState*> runtime_states;

for (auto& task_state : _task_runtime_states) {
runtime_states.push_back(task_state.get());
for (auto& task_states : _task_runtime_states) {
for (auto& task_state : task_states) {
if (task_state) {
runtime_states.push_back(task_state.get());
}
}
}

ReportStatusRequest req {exec_status,
Expand Down Expand Up @@ -1755,15 +1803,17 @@ PipelineFragmentContext::collect_realtime_load_channel_profile_x() const {
return nullptr;
}

for (auto& runtime_state : _task_runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
continue;
}
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
continue;
}

auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();

runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}

auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); }

Status prepare(const doris::TPipelineFragmentParams& request);
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool);

Status submit();

Expand Down Expand Up @@ -187,7 +187,8 @@ class PipelineFragmentContext : public TaskExecutionContext {

bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); }

Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
void _close_fragment_instance();
void _init_next_report_time();

Expand All @@ -206,7 +207,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
int _total_tasks = 0;
std::atomic<int> _total_tasks = 0;

std::unique_ptr<RuntimeProfile> _runtime_profile;
bool _is_report_success = false;
Expand Down Expand Up @@ -303,7 +304,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;

Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st);
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()),
prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {

template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
if (_is_local) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
if (!st.ok()) {
// Recvr not found. Maybe downstream task is finished already.
LOG(INFO) << "Recvr is not found : " << st.to_string();
}
}
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold";

public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold";

public static final String ENABLE_PROJECTION = "enable_projection";

public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_query";
Expand Down Expand Up @@ -1010,7 +1012,7 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private long parallelScanMinRowsPerScanner = 16384; // 16K
private long parallelScanMinRowsPerScanner = 2097152; // 16K

@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
Expand Down Expand Up @@ -1053,6 +1055,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;

@VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true)
public int parallelPrepareThreshold = 32;

@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;

Expand Down Expand Up @@ -2109,6 +2114,7 @@ public void initFuzzyModeVariables() {
Random random = new SecureRandom();
this.parallelExecInstanceNum = random.nextInt(8) + 1;
this.parallelPipelineTaskNum = random.nextInt(8);
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
Expand Down Expand Up @@ -3529,6 +3535,7 @@ public TQueryOptions toThrift() {
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);

// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ struct TQueryOptions {

125: optional bool enable_segment_cache = true;

132: optional i32 parallel_prepare_threshold = 0;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down