Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,8 @@ struct AggSharedState : public BasicSharedState {
std::vector<size_t> make_nullable_keys;

struct MemoryRecord {
MemoryRecord() : used_in_arena(0), used_in_state(0) {}
int64_t used_in_arena;
int64_t used_in_state;
int64_t used_in_arena {};
int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
Expand All @@ -362,11 +361,7 @@ struct AggSharedState : public BasicSharedState {
_order_directions(order_directions),
_null_directions(null_directions) {}

HeapLimitCursor(const HeapLimitCursor& other) noexcept
: _row_id(other._row_id),
_limit_columns(other._limit_columns),
_order_directions(other._order_directions),
_null_directions(other._null_directions) {}
HeapLimitCursor(const HeapLimitCursor& other) = default;

HeapLimitCursor(HeapLimitCursor&& other) noexcept
: _row_id(other._row_id),
Expand Down Expand Up @@ -567,11 +562,10 @@ struct MultiCastSharedState : public BasicSharedState {
};

struct BlockRowPos {
BlockRowPos() : block_num(0), row_num(0), pos(0) {}
int64_t block_num; //the pos at which block
int64_t row_num; //the pos at which row
int64_t pos; //pos = all blocks size + row_num
std::string debug_string() {
int64_t block_num {}; //the pos at which block
int64_t row_num {}; //the pos at which row
int64_t pos {}; //pos = all blocks size + row_num
std::string debug_string() const {
std::string res = "\t block_num: ";
res += std::to_string(block_num);
res += "\t row_num: ";
Expand Down Expand Up @@ -823,14 +817,9 @@ struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
DataDistribution(const DataDistribution& other)
: distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) {
distribution_type = other.distribution_type;
partition_exprs = other.partition_exprs;
return *this;
}
DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
};
Expand All @@ -843,13 +832,14 @@ struct LocalExchangeSharedState : public BasicSharedState {
LocalExchangeSharedState(int num_instances);
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id) {
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
for (auto& source_dep : source_deps) {
source_dep = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_dep->set_shared_state(this);
}
};
void sub_running_sink_operators();
Expand Down
114 changes: 59 additions & 55 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
return Status::OK();
};

if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand Down Expand Up @@ -144,6 +138,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {
local_state._shared_state->sub_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -162,6 +158,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i % _num_sources);
} else {
local_state._shared_state->sub_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(),
false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -181,6 +180,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
local_state._shared_state->sub_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -200,9 +201,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

return Status::OK();
Expand All @@ -220,25 +224,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -264,14 +259,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
*eos = true;
}
} else if (_data_queue[0].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -287,10 +279,14 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_
}
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
add_mem_usage(local_state, new_block.allocated_bytes());

size_t memory_usage = new_block.allocated_bytes();
add_mem_usage(local_state, memory_usage);

if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
} else {
sub_mem_usage(local_state, memory_usage);
}
if (eos) {
_queue_deps[local_state._channel_id]->set_always_ready();
Expand Down Expand Up @@ -350,6 +346,19 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block
return Status::OK();
}

void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState& local_state,
int64_t delta) {
const auto channel_id = local_state._channel_id;
local_state._shared_state->mem_trackers[channel_id]->release(delta);
if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
_sink_deps[channel_id]->set_ready();
}
// if queue empty , block this queue
if (_queues_mem_usege[channel_id] == 0) {
_queue_deps[channel_id]->block();
}
}

void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& local_state,
int64_t delta) {
const auto channel_id = local_state._channel_id;
Expand Down Expand Up @@ -412,14 +421,11 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -436,9 +442,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

return Status::OK();
Expand Down Expand Up @@ -494,9 +503,13 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
vectorized::MutableBlock::create_unique(block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes());

size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
} else {
local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
}
Expand All @@ -519,25 +532,16 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
std::vector<Dependency*> local_state_dependency(int channel_id) override;

void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta);
void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta);
void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int channel_id, int64_t delta);
void close(LocalExchangeSourceLocalState& local_state) override {}

Expand Down