diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 0a3eb7c70639e1..b869efe0f9a540 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -244,8 +244,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { } } }}; + + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, &parent, state] { + [this, &parent, state, execution_context] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return Status::Cancelled("Cancelled"); + } SCOPED_ATTACH_TASK(state); SCOPED_TIMER(Base::_spill_timer); Defer defer {[&]() { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 9628fb5766e7b6..5e61738681235d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -276,6 +276,11 @@ class PartitionedAggSinkLocalState std::mutex _spill_lock; std::condition_variable _spill_cv; + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::shared_ptr _shared_state_holder; + // temp structures during spilling vectorized::MutableColumns key_columns_; vectorized::MutableColumns value_columns_; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 82bb25e674cb6b..916a4e82681428 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -196,9 +196,18 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); _dependency->Dependency::block(); + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); RETURN_IF_ERROR( ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, state] { + [this, state, execution_context] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + // FIXME: return status is meaningless? + return Status::Cancelled("Cancelled"); + } + SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_status.ok()) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index cc408c07e21759..ac63402f227023 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -63,6 +63,11 @@ class PartitionedAggLocalState final : public PipelineXSpillLocalState _shared_state_holder; + std::unique_ptr _internal_runtime_profile; RuntimeProfile::Counter* _get_results_timer = nullptr; RuntimeProfile::Counter* _serialize_result_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 93ce24e8e72695..25ac9d333b165b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -174,29 +174,37 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( build_spilling_stream->get_spill_root_dir()); - return spill_io_pool->submit_func([state, &build_spilling_stream, &mutable_block, this] { - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - if (_spill_status_ok) { - auto build_block = mutable_block->to_block(); - DCHECK_EQ(mutable_block->rows(), 0); - auto st = build_spilling_stream->spill_block(build_block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - } else { - COUNTER_UPDATE(_spill_build_rows, build_block.rows()); - COUNTER_UPDATE(_spill_build_blocks, 1); - } - } - --_spilling_task_count; + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + return spill_io_pool->submit_func( + [execution_context, state, &build_spilling_stream, &mutable_block, this] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } + (void)state; // avoid ut compile error + SCOPED_ATTACH_TASK(state); + if (_spill_status_ok) { + auto build_block = mutable_block->to_block(); + DCHECK_EQ(mutable_block->rows(), 0); + auto st = build_spilling_stream->spill_block(build_block, false); + if (!st.ok()) { + std::unique_lock lock(_spill_lock); + _spill_status_ok = false; + _spill_status = std::move(st); + } else { + COUNTER_UPDATE(_spill_build_rows, build_block.rows()); + COUNTER_UPDATE(_spill_build_blocks, 1); + } + } + --_spilling_task_count; - if (_spilling_task_count == 0) { - std::unique_lock lock(_spill_lock); - _dependency->set_ready(); - } - }); + if (_spilling_task_count == 0) { + std::unique_lock lock(_spill_lock); + _dependency->set_ready(); + } + }); } Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state, @@ -223,34 +231,42 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat } if (!blocks.empty()) { - return spill_io_pool->submit_func([state, &blocks, &spilling_stream, this] { - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); - while (!blocks.empty()) { - auto block = std::move(blocks.back()); - blocks.pop_back(); - if (_spill_status_ok) { - auto st = spilling_stream->spill_block(block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - break; + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + return spill_io_pool->submit_func( + [execution_context, state, &blocks, spilling_stream, this] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + _dependency->set_ready(); + return; + } + SCOPED_ATTACH_TASK(state); + COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); + while (!blocks.empty() && !state->is_cancelled()) { + auto block = std::move(blocks.back()); + blocks.pop_back(); + if (_spill_status_ok) { + auto st = spilling_stream->spill_block(block, false); + if (!st.ok()) { + std::unique_lock lock(_spill_lock); + _spill_status_ok = false; + _spill_status = std::move(st); + break; + } + COUNTER_UPDATE(_spill_probe_rows, block.rows()); + } else { + break; + } } - COUNTER_UPDATE(_spill_probe_rows, block.rows()); - } else { - break; - } - } - --_spilling_task_count; + --_spilling_task_count; - if (_spilling_task_count == 0) { - std::unique_lock lock(_spill_lock); - _dependency->set_ready(); - } - }); + if (_spilling_task_count == 0) { + std::unique_lock lock(_spill_lock); + _dependency->set_ready(); + } + }); } else { --_spilling_task_count; if (_spilling_task_count == 0) { @@ -296,7 +312,14 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti auto& mutable_block = _shared_state->partitioned_build_blocks[partition_index]; DCHECK(mutable_block != nullptr); - auto read_func = [this, state, &spilled_stream, &mutable_block] { + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); @@ -363,7 +386,14 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti auto& blocks = _probe_blocks[partition_index]; /// TODO: maybe recovery more blocks each time. - auto read_func = [this, state, &spilled_stream, &blocks] { + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + auto read_func = [this, execution_context, state, &spilled_stream, &blocks] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index adbaf19314f3f7..7337017fde661d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -80,6 +80,11 @@ class PartitionedHashJoinProbeLocalState final std::vector> _partitioned_blocks; std::map> _probe_blocks; + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::shared_ptr _shared_state_holder; + std::vector _probe_spilling_streams; std::unique_ptr _partitioner; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 20b3531c0cd77c..a63f22c1329289 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -74,7 +74,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); DCHECK(spill_io_pool != nullptr); - auto st = spill_io_pool->submit_func([this, state, spilling_stream, i] { + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + auto st = spill_io_pool->submit_func([this, execution_context, state, spilling_stream, i] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); _spill_to_disk(i, spilling_stream); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index f2d5ca3e1403a1..44081bb0caae07 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -66,6 +66,11 @@ class PartitionedHashJoinSinkLocalState Status _spill_status; std::mutex _spill_status_lock; + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::shared_ptr _shared_state_holder; + std::unique_ptr _partitioner; RuntimeProfile::Counter* _partition_timer = nullptr; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 15edcf3c8d1f12..6e70ba5fcaf9e5 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -214,11 +214,20 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { if (!_eos) { Base::_dependency->Dependency::block(); } + + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); status = ExecEnv::GetInstance() ->spill_stream_mgr() ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, &parent] { + ->submit_func([this, state, &parent, execution_context] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return Status::OK(); + } + SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_shared_state->sink_status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index aba85abece3215..ae5a3bcb8c7d83 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -48,6 +48,11 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _shared_state_holder; + std::unique_ptr _runtime_state; std::unique_ptr _internal_runtime_profile; RuntimeProfile::Counter* _partial_sort_timer = nullptr; diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index f82c015a17510b..c772b6b2797e65 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -90,7 +90,15 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } }}; - auto spill_func = [this, state, &parent] { + auto execution_context = state->get_task_execution_context(); + _shared_state_holder = _shared_state->shared_from_this(); + auto spill_func = [this, state, &parent, execution_context] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return Status::OK(); + } + SCOPED_TIMER(_spill_merge_sort_timer); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index ae76c4e1b58191..8132dd5a56c9e3 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -57,6 +57,11 @@ class SpillSortLocalState final : public PipelineXSpillLocalState _shared_state_holder; + int64_t _external_sort_bytes_threshold = 134217728; // 128M std::vector _current_merging_streams; std::unique_ptr _merger; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index fe95c1c4470058..622c175edcbc6a 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -57,6 +57,8 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { + ENABLE_FACTORY_CREATOR(BasicSharedState) + template TARGET* cast() { DCHECK(dynamic_cast(this)) @@ -184,7 +186,9 @@ class Dependency : public std::enable_shared_from_this { std::mutex _always_ready_lock; }; -struct FakeSharedState final : public BasicSharedState {}; +struct FakeSharedState final : public BasicSharedState { + ENABLE_FACTORY_CREATOR(FakeSharedState) +}; struct FakeDependency final : public Dependency { public: @@ -325,6 +329,7 @@ class RuntimeFilterDependency final : public Dependency { }; struct AggSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(AggSharedState) public: AggSharedState() { agg_data = std::make_unique(); @@ -418,8 +423,10 @@ struct AggSharedState : public BasicSharedState { }; struct AggSpillPartition; -struct PartitionedAggSharedState : public BasicSharedState { -public: +struct PartitionedAggSharedState : public BasicSharedState, + public std::enable_shared_from_this { + ENABLE_FACTORY_CREATOR(PartitionedAggSharedState) + PartitionedAggSharedState() = default; ~PartitionedAggSharedState() override = default; @@ -480,11 +487,15 @@ struct AggSpillPartition { }; using AggSpillPartitionSPtr = std::shared_ptr; struct SortSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(SortSharedState) public: std::unique_ptr sorter; }; -struct SpillSortSharedState : public BasicSharedState { +struct SpillSortSharedState : public BasicSharedState, + public std::enable_shared_from_this { + ENABLE_FACTORY_CREATOR(SpillSortSharedState) + SpillSortSharedState() = default; ~SpillSortSharedState() override = default; @@ -512,6 +523,8 @@ struct SpillSortSharedState : public BasicSharedState { }; struct UnionSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(UnionSharedState) + public: UnionSharedState(int child_count = 1) : data_queue(child_count), _child_count(child_count) {}; int child_count() const { return _child_count; } @@ -527,6 +540,8 @@ struct MultiCastSharedState : public BasicSharedState { }; struct AnalyticSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(AnalyticSharedState) + public: AnalyticSharedState() = default; @@ -560,6 +575,7 @@ struct JoinSharedState : public BasicSharedState { }; struct HashJoinSharedState : public JoinSharedState { + ENABLE_FACTORY_CREATOR(HashJoinSharedState) // mark the join column whether support null eq std::vector is_null_safe_eq_join; // mark the build hash table whether it needs to store null value @@ -576,13 +592,18 @@ struct HashJoinSharedState : public JoinSharedState { bool probe_ignore_null = false; }; -struct PartitionedHashJoinSharedState : public HashJoinSharedState { +struct PartitionedHashJoinSharedState + : public HashJoinSharedState, + public std::enable_shared_from_this { + ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState) + std::vector> partitioned_build_blocks; std::vector spilled_streams; bool need_to_spill = false; }; struct NestedLoopJoinSharedState : public JoinSharedState { + ENABLE_FACTORY_CREATOR(NestedLoopJoinSharedState) // if true, left child has no more rows to process bool left_side_eos = false; // Visited flags for each row in build side. @@ -592,6 +613,7 @@ struct NestedLoopJoinSharedState : public JoinSharedState { }; struct PartitionSortNodeSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState) public: std::queue blocks_buffer; std::mutex buffer_mutex; @@ -610,6 +632,7 @@ class AsyncWriterDependency final : public Dependency { }; struct SetSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(SetSharedState) public: /// default init vectorized::Block build_block; // build to source diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 6ee9ccb13c4d89..484a3f1791cfd2 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -299,7 +299,7 @@ std::shared_ptr DataSinkOperatorX::create_shar return nullptr; } else { std::shared_ptr ss = nullptr; - ss.reset(new typename LocalStateType::SharedStateType()); + ss = LocalStateType::SharedStateType::create_shared(); ss->id = operator_id(); for (auto& dest : dests_id()) { ss->related_op_ids.insert(dest);