From 9b2d33c210420782be6a71ed0af7728e184e80ab Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 8 Jan 2024 11:17:09 +0800 Subject: [PATCH] Revert "[feature](pipelineX) control exchange sink by memory usage (#28814)" This reverts commit e326ebb63e4e07d8ee6595561ab19dc5d411f592. --- be/src/pipeline/exec/exchange_sink_operator.cpp | 7 +------ be/src/pipeline/exec/exchange_sink_operator.h | 7 +------ .../pipeline/exec/exchange_source_operator.cpp | 1 - be/src/vec/runtime/vdata_stream_recvr.cpp | 16 ++++------------ be/src/vec/runtime/vdata_stream_recvr.h | 9 +-------- be/src/vec/sink/vdata_stream_sender.h | 5 ----- 6 files changed, 7 insertions(+), 38 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9f9b36d1cb2614..43bec0bd92d2e5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -199,22 +199,17 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _wait_channel_timer.resize(local_size); auto deps_for_channels = AndDependency::create_shared( _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); - auto deps_for_channels_mem_limit = AndDependency::create_shared( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); - for (auto* channel : channels) { + for (auto channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); DCHECK(_local_channels_dependency[dep_id] != nullptr); deps_for_channels->add_child(_local_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = ADD_CHILD_TIMER( _profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name); - auto local_recvr = channel->local_recvr(); - deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency()); dep_id++; } } _exchange_sink_dependency->add_child(deps_for_channels); - _exchange_sink_dependency->add_child(deps_for_channels_mem_limit); } if (p._part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 1ddfc5f6174d07..24fe9e1d84c440 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,14 +96,9 @@ class LocalExchangeChannelDependency final : public Dependency { LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx) : Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {} ~LocalExchangeChannelDependency() override = default; + // TODO(gabriel): blocked by memory }; -class LocalExchangeMemLimitDependency final : public Dependency { - ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency); - LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx) - : Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {} - ~LocalExchangeMemLimitDependency() override = default; -}; class ExchangeSinkLocalState final : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 847891104c7b24..255cb151410fda 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); - stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx()); auto* source_dependency = _dependency; const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 0dc47363a8eef5..56f6c51e68490d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -354,7 +354,8 @@ VDataStreamRecvr::VDataStreamRecvr( _profile(profile), _peak_memory_usage_counter(nullptr), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), - _enable_pipeline(state->enable_pipeline_exec()) { + _enable_pipeline(state->enable_pipeline_exec()), + _mem_available(std::make_shared(true)) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id)); @@ -505,21 +506,12 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); auto val = _blocks_memory_usage_current_value.fetch_add(size); if (val + size > config::exchg_node_buffer_size_bytes) { - if (_exchange_sink_mem_limit_dependency) { - _exchange_sink_mem_limit_dependency->block(); - } + *_mem_available = false; } else { - if (_exchange_sink_mem_limit_dependency) { - _exchange_sink_mem_limit_dependency->set_ready(); - } + *_mem_available = true; } } -void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) { - _exchange_sink_mem_limit_dependency = - pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx); -} - void VDataStreamRecvr::close() { if (_is_closed) { return; diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 122a9d763e1be9..e0b63459ad2d5e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -42,7 +42,6 @@ #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" -#include "runtime/query_context.h" #include "runtime/query_statistics.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -62,7 +61,6 @@ class RuntimeState; namespace pipeline { struct ExchangeDataDependency; class LocalExchangeChannelDependency; -class LocalExchangeMemLimitDependency; class ExchangeLocalState; } // namespace pipeline @@ -132,10 +130,6 @@ class VDataStreamRecvr { std::shared_ptr get_local_channel_dependency( int sender_id); - void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx); - - auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; } - private: void update_blocks_memory_usage(int64_t size); class PipSenderQueue; @@ -195,8 +189,7 @@ class VDataStreamRecvr { std::vector> _sender_to_local_channel_dependency; - // use to limit sink write - std::shared_ptr _exchange_sink_mem_limit_dependency; + std::shared_ptr _mem_available; }; class ThreadClosure : public google::protobuf::Closure { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 0e727a41f03e73..f59dad266f87ab 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -321,11 +321,6 @@ class Channel { void set_receiver_eof(Status st) { _receiver_status = st; } - auto local_recvr() { - DCHECK(is_local()); - return _local_recvr; - } - protected: bool _recvr_is_valid() { if (_local_recvr && !_local_recvr->is_closed()) {