Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");

// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
// The default delete bitmap cache is set to 100MB,
// which can be insufficient and cause performance issues when the amount of user data is large.
// To mitigate the problem of an inadequate cache,
// we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
DEFINE_String(delete_bitmap_dynamic_agg_cache_limit, "0.5%");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");

// reference https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec);

// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
DECLARE_String(delete_bitmap_dynamic_agg_cache_limit);
DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);

// A common object cache depends on an Sharded LRU Cache.
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
#include "util/string_util.h"
#include "util/time.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -924,7 +926,18 @@ bool operator!=(const TabletMeta& a, const TabletMeta& b) {
}

DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {
_agg_cache.reset(new AggCache(config::delete_bitmap_agg_cache_capacity));
// The default delete bitmap cache is set to 100MB,
// which can be insufficient and cause performance issues when the amount of user data is large.
// To mitigate the problem of an inadequate cache,
// we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
bool is_percent = false;
int64_t delete_bitmap_agg_cache_cache_limit =
ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
_agg_cache.reset(new AggCache(delete_bitmap_agg_cache_cache_limit >
config::delete_bitmap_agg_cache_capacity
? delete_bitmap_agg_cache_cache_limit
: config::delete_bitmap_agg_cache_capacity));
}

DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) {
Expand Down
30 changes: 18 additions & 12 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat
if (Base::_closed) {
return Status::OK();
}
<<<<<<< HEAD
{
std::unique_lock<std::mutex> lk(_spill_lock);
if (_is_spilling) {
_spill_cv.wait(lk);
}
}
=======
dec_running_big_mem_op_num(state);
>>>>>>> bb11955709 ([improvement](spill) avoid spill if memory is enough (#33075))
return Base::close(state, exec_status);
}

Expand Down Expand Up @@ -166,13 +170,17 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
auto* runtime_state = local_state._runtime_state.get();
RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
if (eos) {
LOG(INFO) << "agg node " << id() << " sink eos";
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state));
} else {
for (auto& partition : local_state._shared_state->spill_partitions) {
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state));
} else {
for (auto& partition : local_state._shared_state->spill_partitions) {
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
}
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
local_state._dependency->set_ready_to_read();
}
}
Expand Down Expand Up @@ -229,8 +237,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory"
<< ", eos: " << _eos;
RETURN_IF_ERROR(Base::_shared_state->sink_status);
DCHECK(!_is_spilling);
_is_spilling = true;
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
profile()->add_info_string("Spilled", "true");
}

// TODO: spill thread may set_ready before the task::execute thread put the task to blocked state
if (!_eos) {
Expand All @@ -240,7 +250,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
Status status;
Defer defer {[&]() {
if (!status.ok()) {
_is_spilling = false;
if (!_eos) {
Base::_dependency->Dependency::set_ready();
}
Expand Down Expand Up @@ -269,15 +278,12 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
<< ", eos: " << _eos;
}
{
std::unique_lock<std::mutex> lk(_spill_lock);
_is_spilling = false;
if (_eos) {
Base::_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
Base::_dependency->Dependency::set_ready();
}
_spill_cv.notify_one();
}
}};
auto* runtime_state = _runtime_state.get();
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,6 @@ class PartitionedAggSinkLocalState

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;
bool _is_spilling = false;
std::mutex _spill_lock;
std::condition_variable _spill_cv;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
Expand Down
29 changes: 12 additions & 17 deletions be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
if (_is_merging) {
_merge_spill_cv.wait(lk);
}
}
dec_running_big_mem_op_num(state);
return Base::close(state);
}
PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
Expand Down Expand Up @@ -131,13 +126,16 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
if (local_state._shared_state->is_spilled) {
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));

/// When `_is_merging` is true means we are reading spilled data and merging the data into hash table.
if (local_state._is_merging) {
return Status::OK();
/// When `_is_merging` is true means we are reading spilled data and merging the data into hash table.
if (local_state._is_merging) {
return Status::OK();
}
}

// not spilled in sink or current partition still has data
auto* runtime_state = local_state._runtime_state.get();
RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, eos));
if (local_state._runtime_state) {
Expand All @@ -146,7 +144,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
local_state.update_profile(source_local_state->profile());
}
if (*eos) {
if (!local_state._shared_state->spill_partitions.empty()) {
if (local_state._shared_state->is_spilled &&
!local_state._shared_state->spill_partitions.empty()) {
*eos = false;
}
}
Expand Down Expand Up @@ -218,12 +217,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container
->init_once();
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
_is_merging = false;
_dependency->Dependency::set_ready();
_merge_spill_cv.notify_one();
}
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class PartitionedAggLocalState final : public PipelineXSpillLocalState<Partition
std::future<Status> _spill_merge_future;
bool _current_partition_eos = true;
bool _is_merging = false;
std::mutex _merge_spill_lock;
std::condition_variable _merge_spill_cv;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
Expand Down
43 changes: 22 additions & 21 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) {
{
std::unique_lock<std::mutex> lk(_spill_lock);
if (_is_spilling) {
_spill_cv.wait(lk);
}
auto& parent = Base::_parent->template cast<Parent>();
if (parent._enable_spill) {
dec_running_big_mem_op_num(state);
}
return Status::OK();
}
Expand Down Expand Up @@ -172,9 +170,16 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
local_state._shared_state->in_mem_shared_state->sorter->data_size());
if (eos) {
if (_enable_spill) {
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state));
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
RETURN_IF_ERROR(
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
local_state._dependency->set_ready_to_read();
}
} else {
Expand All @@ -186,8 +191,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
DCHECK(!_is_spilling);
_is_spilling = true;
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
profile()->add_info_string("Spilled", "true");
}

LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory"
<< ", eos: " << _eos;
Expand Down Expand Up @@ -243,17 +250,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
_shared_state->clear();
}

{
std::unique_lock<std::mutex> lk(_spill_lock);
_spilling_stream.reset();
_is_spilling = false;
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
_spill_cv.notify_one();
_spilling_stream.reset();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
}};

Expand Down Expand Up @@ -288,7 +290,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
return Status::OK();
});
if (!status.ok()) {
_is_spilling = false;
_spilling_stream->end_spill(status);

if (!_eos) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha
RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
bool _is_spilling = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
std::mutex _spill_lock;
std::condition_variable _spill_cv;
};

class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocalState> {
Expand Down
29 changes: 7 additions & 22 deletions be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,10 @@ Status SpillSortLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
if (_is_merging) {
_merge_spill_cv.wait(lk);
}
if (Base::_shared_state->enable_spill) {
dec_running_big_mem_op_num(state);
}
RETURN_IF_ERROR(Base::close(state));
for (auto& stream : _current_merging_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
_current_merging_streams.clear();
return Status::OK();
}
int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
Expand All @@ -78,14 +71,11 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) {
auto& parent = Base::_parent->template cast<Parent>();
LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data";
DCHECK(!_is_merging);
_is_merging = true;
_dependency->Dependency::block();

Status status;
Defer defer {[&]() {
if (!status.ok()) {
_is_merging = false;
_dependency->Dependency::set_ready();
}
}};
Expand All @@ -108,12 +98,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
} else {
LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data finish";
}
{
std::unique_lock<std::mutex> lk(_merge_spill_lock);
_is_merging = false;
_dependency->Dependency::set_ready();
_merge_spill_cv.notify_one();
}
_dependency->Dependency::set_ready();
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
Expand Down Expand Up @@ -252,15 +237,15 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);

if (!local_state.Base::_shared_state->enable_spill) {
RETURN_IF_ERROR(
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
} else {
if (local_state.Base::_shared_state->enable_spill && local_state._shared_state->is_spilled) {
if (!local_state._merger) {
return local_state.initiate_merge_sort_spill_streams(state);
} else {
RETURN_IF_ERROR(local_state._merger->get_next(block, eos));
}
} else {
RETURN_IF_ERROR(
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
}
local_state.reached_limit(block, eos);
return Status::OK();
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/spill_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ class SpillSortLocalState final : public PipelineXSpillLocalState<SpillSortShare
int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
bool _is_merging = false;
std::mutex _merge_spill_lock;
std::condition_variable _merge_spill_cv;

std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
// counters for spill merge sort
Expand Down
9 changes: 7 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
if (agg_node->is_aggregate_evaluators_empty()) {
if (agg_node->is_probe_expr_ctxs_empty() && node->row_desc().num_slots() == 0) {
return Status::InternalError("Illegal aggregate node " +
std::to_string(agg_node->id()) +
": group by and output is empty");
}
if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
Expand All @@ -570,7 +575,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else if (agg_node->is_streaming_preagg()) {
} else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
Expand Down
Loading