From 3b67c79622a6d40eb5c1b53f73e6edd92153bce5 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 12 Feb 2025 22:12:34 +0800 Subject: [PATCH] [Bug](exec) fix agg top limit heap error cause result error --- be/src/pipeline/dependency.cpp | 12 +++ be/src/pipeline/dependency.h | 3 + .../exec/aggregation_sink_operator.cpp | 24 +---- .../operator/agg_shared_state_test.cpp | 95 +++++++++++++++++++ 4 files changed, 114 insertions(+), 20 deletions(-) create mode 100644 be/test/pipeline/operator/agg_shared_state_test.cpp diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index dcf5c7a0a81d7c..8ae28244be8f73 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -416,4 +416,16 @@ Status SetSharedState::hash_table_init() { return init_hash_method(hash_table_variants.get(), data_types, true); } +void AggSharedState::refresh_top_limit(size_t row_id, + const vectorized::ColumnRawPtrs& key_columns) { + for (int j = 0; j < key_columns.size(); ++j) { + limit_columns[j]->insert_from(*key_columns[j], row_id); + } + limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns, order_directions, + null_directions); + + limit_heap.pop(); + limit_columns_min = limit_heap.top()._row_id; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index aaa271cdd09e05..489497897dc2a3 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -384,6 +384,9 @@ struct AggSharedState : public BasicSharedState { std::priority_queue limit_heap; + // Refresh the top limit heap with a new row + void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns); + private: vectorized::MutableColumns _get_keys_hash_table(); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 8956e8292a72d1..a2e397321a84db 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -598,23 +598,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData agg_method.init_serialized_keys(key_columns, num_rows); size_t i = 0; - auto refresh_top_limit = [&, this]() { - _shared_state->limit_heap.pop(); - for (int j = 0; j < key_columns.size(); ++j) { - _shared_state->limit_columns[j]->insert_from(*key_columns[j], - i); - } - _shared_state->limit_heap.emplace( - _shared_state->limit_columns[0]->size() - 1, - _shared_state->limit_columns, - _shared_state->order_directions, - _shared_state->null_directions); - _shared_state->limit_columns_min = - _shared_state->limit_heap.top()._row_id; - }; - - auto creator = [this, refresh_top_limit](const auto& ctor, auto& key, - auto& origin) { + auto creator = [&](const auto& ctor, auto& key, auto& origin) { try { HashMethodType::try_presis_key_and_origin(key, origin, *_agg_arena_pool); @@ -626,7 +610,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData throw Exception(st.code(), st.to_string()); } ctor(key, mapped); - refresh_top_limit(); + _shared_state->refresh_top_limit(i, key_columns); } catch (...) { // Exception-safety - if it can not allocate memory or create status, // the destructors will not be called. @@ -635,7 +619,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData } }; - auto creator_for_null_key = [this, refresh_top_limit](auto& mapped) { + auto creator_for_null_key = [&](auto& mapped) { mapped = _agg_arena_pool->aligned_alloc( Base::_parent->template cast() ._total_size_of_aggregate_states, @@ -645,7 +629,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData if (!st) { throw Exception(st.code(), st.to_string()); } - refresh_top_limit(); + _shared_state->refresh_top_limit(i, key_columns); }; SCOPED_TIMER(_hash_table_emplace_timer); diff --git a/be/test/pipeline/operator/agg_shared_state_test.cpp b/be/test/pipeline/operator/agg_shared_state_test.cpp new file mode 100644 index 00000000000000..e4ce200ed1ec5c --- /dev/null +++ b/be/test/pipeline/operator/agg_shared_state_test.cpp @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "pipeline/dependency.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::pipeline { + +class AggSharedStateTest : public testing::Test { +protected: + void SetUp() override { + _shared_state = std::make_shared(); + + // Setup test data + auto int_type = std::make_shared(); + _shared_state->limit_columns.push_back(int_type->create_column()); + + // Setup order directions (ascending) + _shared_state->order_directions = {1}; + _shared_state->null_directions = {1}; + + // Create test column + _test_column = int_type->create_column(); + auto* col_data = reinterpret_cast*>(_test_column.get()); + + // Insert test values: 5, 3, 1, -2, -1, 0 + col_data->insert(5); + col_data->insert(3); + col_data->insert(1); + col_data->insert(-1); + col_data->insert(0); + col_data->insert(2); + + _key_columns.push_back(_test_column.get()); + // prepare the heap data first [5, 3, 1, -2] + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < _key_columns.size(); ++j) { + _shared_state->limit_columns[j]->insert_from(*_key_columns[j], i); + } + // build agg limit heap + _shared_state->limit_heap.emplace( + _shared_state->limit_columns[0]->size() - 1, _shared_state->limit_columns, + _shared_state->order_directions, _shared_state->null_directions); + } + // keep the top limit values, only 3 value in heap [-1, 3, 1] + _shared_state->limit_heap.pop(); + _shared_state->limit_columns_min = _shared_state->limit_heap.top()._row_id; + } + + std::shared_ptr _shared_state; + vectorized::MutableColumnPtr _test_column; + vectorized::ColumnRawPtrs _key_columns; +}; + +TEST_F(AggSharedStateTest, TestRefreshTopLimit) { + // Test with limit = 3 (keep top 3 values) + _shared_state->limit = 3; + + // Add values one by one and verify the minimum value is tracked correctly + EXPECT_EQ(_shared_state->limit_columns_min, 1); + + _shared_state->refresh_top_limit(4, _key_columns); + EXPECT_EQ(_shared_state->limit_columns_min, 2); + + _shared_state->refresh_top_limit(5, _key_columns); + EXPECT_EQ(_shared_state->limit_columns_min, 2); // 1 should still be max + + auto heap_size = _shared_state->limit_heap.size(); + EXPECT_EQ(heap_size, 3); + + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 2); // 1 should be the top value + _shared_state->limit_heap.pop(); + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 4); // 0 should be the top value + _shared_state->limit_heap.pop(); + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 3); // -1 should be the top value +} + +} // namespace doris::pipeline