Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,17 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
auto deps_for_channels_mem_limit = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto* channel : channels) {
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name);
auto local_recvr = channel->local_recvr();
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
_exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
Expand Down
7 changes: 1 addition & 6 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,9 @@ class LocalExchangeChannelDependency final : public Dependency {
LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};

class LocalExchangeMemLimitDependency final : public Dependency {
ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {}
~LocalExchangeMemLimitDependency() override = default;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
Expand Down
16 changes: 4 additions & 12 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
_enable_pipeline(state->enable_pipeline_exec()),
_mem_available(std::make_shared<bool>(true)) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
Expand Down Expand Up @@ -505,21 +506,12 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->block();
}
*_mem_available = false;
} else {
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->set_ready();
}
*_mem_available = true;
}
}

void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) {
_exchange_sink_mem_limit_dependency =
pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx);
}

void VDataStreamRecvr::close() {
if (_is_closed) {
return;
Expand Down
9 changes: 1 addition & 8 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
Expand All @@ -62,7 +61,6 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
class LocalExchangeMemLimitDependency;
class ExchangeLocalState;
} // namespace pipeline

Expand Down Expand Up @@ -132,10 +130,6 @@ class VDataStreamRecvr {
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency(
int sender_id);

void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx);

auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; }

private:
void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
Expand Down Expand Up @@ -195,8 +189,7 @@ class VDataStreamRecvr {
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;

// use to limit sink write
std::shared_ptr<pipeline::LocalExchangeMemLimitDependency> _exchange_sink_mem_limit_dependency;
std::shared_ptr<bool> _mem_available;
};

class ThreadClosure : public google::protobuf::Closure {
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,6 @@ class Channel {

void set_receiver_eof(Status st) { _receiver_status = st; }

auto local_recvr() {
DCHECK(is_local());
return _local_recvr;
}

protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {
Expand Down