From 4436e850dfd85ca06e67352ffa455d71a0c8ca64 Mon Sep 17 00:00:00 2001 From: zzzxl1993 Date: Fri, 8 Nov 2024 17:45:15 +0800 Subject: [PATCH] [feature](inverted index) multi_topn function add --- .../aggregate_functions/aggregate_function.h | 1 + .../aggregate_function_approx_top.h | 29 ++ .../aggregate_function_approx_top_k.cpp | 85 +++++ .../aggregate_function_approx_top_k.h | 290 +++++++++++++++ .../aggregate_function_simple_factory.cpp | 6 +- be/src/vec/common/arena_with_free_lists.h | 104 ++++++ be/src/vec/common/bit_helpers.h | 45 +++ be/src/vec/common/space_saving.h | 342 ++++++++++++++++++ be/src/vec/exprs/vectorized_agg_fn.cpp | 16 +- be/test/common/space_saving_test.cpp | 300 +++++++++++++++ .../catalog/BuiltinAggregateFunctions.java | 2 + .../expressions/functions/agg/ApproxTopK.java | 84 +++++ .../visitor/AggregateFunctionVisitor.java | 5 + .../test_index_approx_top_k.out | 133 +++++++ .../test_index_approx_top_k.groovy | 161 +++++++++ 15 files changed, 1597 insertions(+), 6 deletions(-) create mode 100644 be/src/vec/aggregate_functions/aggregate_function_approx_top.h create mode 100644 be/src/vec/aggregate_functions/aggregate_function_approx_top_k.cpp create mode 100644 be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h create mode 100644 be/src/vec/common/arena_with_free_lists.h create mode 100644 be/src/vec/common/bit_helpers.h create mode 100644 be/src/vec/common/space_saving.h create mode 100644 be/test/common/space_saving_test.cpp create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ApproxTopK.java create mode 100644 regression-test/data/inverted_index_p0/test_index_approx_top_k.out create mode 100644 regression-test/suites/inverted_index_p0/test_index_approx_top_k.groovy diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 39de0324d1415f..e9148716f99f35 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -43,6 +43,7 @@ class IDataType; struct AggregateFunctionAttr { bool enable_decimal256 {false}; + std::vector> column_infos; }; template diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top.h b/be/src/vec/aggregate_functions/aggregate_function_approx_top.h new file mode 100644 index 00000000000000..7885321bba3e11 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top.h @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/core/types.h" + +namespace doris::vectorized { + +class AggregateFunctionApproxTop { +public: + static inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.cpp b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.cpp new file mode 100644 index 00000000000000..d6298881a90630 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/aggregate_functions/aggregate_function_approx_top_k.h" + +#include "common/exception.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/aggregate_functions/helpers.h" +#include "vec/data_types/data_type.h" + +namespace doris::vectorized { + +int32_t is_valid_const_columns(const std::vector& is_const_columns) { + int32_t true_count = 0; + bool found_false_after_true = false; + for (int32_t i = is_const_columns.size() - 1; i >= 0; --i) { + if (is_const_columns[i]) { + true_count++; + if (found_false_after_true) { + return false; + } + } else { + if (true_count > 2) { + return false; + } + found_false_after_true = true; + } + } + if (true_count > 2) { + throw Exception(ErrorCode::INVALID_ARGUMENT, "Invalid is_const_columns configuration"); + } + return true_count; +} + +AggregateFunctionPtr create_aggregate_function_approx_top_k(const std::string& name, + const DataTypes& argument_types, + const bool result_is_nullable, + const AggregateFunctionAttr& attr) { + if (argument_types.empty()) { + return nullptr; + } + + std::vector is_const_columns; + std::vector column_names; + for (const auto& [name, is_const] : attr.column_infos) { + is_const_columns.push_back(is_const); + if (!is_const) { + column_names.push_back(name); + } + } + + int32_t true_count = is_valid_const_columns(is_const_columns); + if (true_count == 0) { + return creator_without_type::create>( + argument_types, result_is_nullable, column_names); + } else if (true_count == 1) { + return creator_without_type::create>( + argument_types, result_is_nullable, column_names); + } else if (true_count == 2) { + return creator_without_type::create>( + argument_types, result_is_nullable, column_names); + } else { + return nullptr; + } +} + +void register_aggregate_function_approx_top_k(AggregateFunctionSimpleFactory& factory) { + factory.register_function_both("approx_top_k", create_aggregate_function_approx_top_k); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h new file mode 100644 index 00000000000000..7253ae8a96e200 --- /dev/null +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_approx_top.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/common/space_saving.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_ipv4.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; + +struct AggregateFunctionTopKGenericData { + using Set = SpaceSaving; + + Set value; +}; + +template +class AggregateFunctionApproxTopK final + : public IAggregateFunctionDataHelper>, + AggregateFunctionApproxTop { +private: + using State = AggregateFunctionTopKGenericData; + +public: + AggregateFunctionApproxTopK(std::vector column_names, + const DataTypes& argument_types_) + : IAggregateFunctionDataHelper>(argument_types_), + _column_names(std::move(column_names)) {} + + String get_name() const override { return "approx_top_k"; } + + DataTypePtr get_return_type() const override { return std::make_shared(); } + + // Serializes the aggregate function's state (including the SpaceSaving structure and threshold) into a buffer. + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).value.write(buf); + + write_var_uint(_column_names.size(), buf); + for (const auto& column_name : _column_names) { + write_string_binary(column_name, buf); + } + write_var_uint(_threshold, buf); + write_var_uint(_reserved, buf); + } + + // Deserializes the aggregate function's state from a buffer (including the SpaceSaving structure and threshold). + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena* arena) const override { + auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) { + size_t size = 0; + read_var_uint(size, buf); + + if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string size."); + } + + char* data = arena.alloc(size); + buf.read(data, size); + + return StringRef(data, size); + }; + + auto& set = this->data(place).value; + set.clear(); + + size_t size = 0; + read_var_uint(size, buf); + if (UNLIKELY(size > TOP_K_MAX_SIZE)) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Too large size ({}) for aggregate function '{}' state (maximum is {})", + size, get_name(), TOP_K_MAX_SIZE); + } + + set.resize(size); + for (size_t i = 0; i < size; ++i) { + auto ref = readStringBinaryInto(*arena, buf); + uint64_t count = 0; + uint64_t error = 0; + read_var_uint(count, buf); + read_var_uint(error, buf); + set.insert(ref, count, error); + arena->rollback(ref.size); + } + + set.read_alpha_map(buf); + + uint64_t column_size = 0; + read_var_uint(column_size, buf); + _column_names.clear(); + for (uint64_t i = 0; i < column_size; i++) { + std::string column_name; + read_string_binary(column_name, buf); + _column_names.emplace_back(std::move(column_name)); + } + read_var_uint(_threshold, buf); + read_var_uint(_reserved, buf); + } + + // Adds a new row of data to the aggregate function (inserts a new value into the SpaceSaving structure). + void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, + Arena* arena) const override { + if (!_init_flag) { + lazy_init(columns, row_num); + } + + auto& set = this->data(place).value; + if (set.capacity() != _reserved) { + set.resize(_reserved); + } + + auto all_serialize_value_into_arena = + [](size_t i, size_t keys_size, const IColumn** columns, Arena* arena) -> StringRef { + const char* begin = nullptr; + + size_t sum_size = 0; + for (size_t j = 0; j < keys_size; ++j) { + sum_size += columns[j]->serialize_value_into_arena(i, *arena, begin).size; + } + + return {begin, sum_size}; + }; + + StringRef str_serialized = + all_serialize_value_into_arena(row_num, _column_names.size(), columns, arena); + set.insert(str_serialized); + arena->rollback(str_serialized.size); + } + + void add_many(AggregateDataPtr __restrict place, const IColumn** columns, + std::vector& rows, Arena* arena) const override { + for (auto row : rows) { + add(place, columns, row, arena); + } + } + + void reset(AggregateDataPtr __restrict place) const override { + this->data(place).value.clear(); + } + + // Merges the state of another aggregate function into the current one (merges two SpaceSaving sets). + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, + Arena*) const override { + auto& rhs_set = this->data(rhs).value; + if (!rhs_set.size()) { + return; + } + + auto& set = this->data(place).value; + if (set.capacity() != _reserved) { + set.resize(_reserved); + } + set.merge(rhs_set); + } + + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { + auto& data_to = assert_cast(to); + + const typename State::Set& set = this->data(place).value; + auto result_vec = set.top_k(_threshold); + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter writer(buffer); + writer.StartArray(); + for (auto& result : result_vec) { + auto argument_types = this->get_argument_types(); + MutableColumns argument_columns(_column_names.size()); + for (size_t i = 0; i < _column_names.size(); ++i) { + argument_columns[i] = argument_types[i]->create_column(); + } + rapidjson::StringBuffer sub_buffer; + rapidjson::Writer sub_writer(sub_buffer); + sub_writer.StartObject(); + const char* begin = result.key.data; + for (size_t i = 0; i < _column_names.size(); i++) { + begin = argument_columns[i]->deserialize_and_insert_from_arena(begin); + std::string row_str = argument_types[i]->to_string(*argument_columns[i], 0); + sub_writer.Key(_column_names[i].data(), _column_names[i].size()); + sub_writer.String(row_str.data(), row_str.size()); + } + sub_writer.Key("count"); + sub_writer.String(std::to_string(result.count).c_str()); + sub_writer.EndObject(); + writer.RawValue(sub_buffer.GetString(), sub_buffer.GetSize(), rapidjson::kObjectType); + } + writer.EndArray(); + std::string res = buffer.GetString(); + data_to.insert_data(res.data(), res.size()); + } + +private: + void lazy_init(const IColumn** columns, ssize_t row_num) const { + auto get_param = [](size_t idx, const DataTypes& data_types, + const IColumn** columns) -> uint64_t { + const auto& data_type = data_types.at(idx); + const IColumn* column = columns[idx]; + + const auto* type = data_type.get(); + if (type->is_nullable()) { + type = assert_cast(type) + ->get_nested_type() + .get(); + } + int64_t value = 0; + WhichDataType which(type); + if (which.idx == TypeIndex::Int8) { + value = assert_cast(column) + ->get_element(0); + } else if (which.idx == TypeIndex::Int16) { + value = assert_cast(column) + ->get_element(0); + } else if (which.idx == TypeIndex::Int32) { + value = assert_cast(column) + ->get_element(0); + } + if (value <= 0) { + throw Exception(ErrorCode::INVALID_ARGUMENT, + "The parameter cannot be less than or equal to 0."); + } + return value; + }; + + const auto& data_types = this->get_argument_types(); + if (ArgsSize == 1) { + _threshold = + std::min(get_param(_column_names.size(), data_types, columns), (uint64_t)1000); + } else if (ArgsSize == 2) { + _threshold = + std::min(get_param(_column_names.size(), data_types, columns), (uint64_t)1000); + _reserved = std::min( + std::max(get_param(_column_names.size() + 1, data_types, columns), _threshold), + (uint64_t)1000); + } + + if (_threshold == 0 || _reserved == 0 || _threshold > 1000 || _reserved > 1000) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "approx_top_k param error, _threshold: {}, _reserved: {}", _threshold, + _reserved); + } + + _init_flag = true; + } + + mutable std::vector _column_names; + mutable bool _init_flag = false; + mutable uint64_t _threshold = 10; + mutable uint64_t _reserved = 300; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index daf3884cc285ed..dd379a41b19983 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -72,6 +72,7 @@ void register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_skewness(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_kurtosis(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_approx_top_k(AggregateFunctionSimpleFactory& factory); AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { static std::once_flag oc; @@ -114,20 +115,17 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_linear_histogram(instance); register_aggregate_function_map_agg(instance); register_aggregate_function_bitmap_agg(instance); - register_aggregate_function_stddev_variance_samp(instance); register_aggregate_function_replace_reader_load(instance); register_aggregate_function_window_lead_lag_first_last(instance); register_aggregate_function_HLL_union_agg(instance); - register_aggregate_functions_corr(instance); register_aggregate_function_covar_pop(instance); register_aggregate_function_covar_samp(instance); - register_aggregate_function_combinator_foreach(instance); - register_aggregate_function_skewness(instance); register_aggregate_function_kurtosis(instance); + register_aggregate_function_approx_top_k(instance); }); return instance; } diff --git a/be/src/vec/common/arena_with_free_lists.h b/be/src/vec/common/arena_with_free_lists.h new file mode 100644 index 00000000000000..d8ea9542a758b9 --- /dev/null +++ b/be/src/vec/common/arena_with_free_lists.h @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ArenaWithFreeLists.h +// and modified by Doris + +#pragma once + +#include "vec/common/arena.h" +#if __has_include() +#include +#endif +#include "vec/common/bit_helpers.h" + +namespace doris::vectorized { + +class ArenaWithFreeLists : private Allocator, private boost::noncopyable { +private: +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wzero-length-array" +#elif defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wzero-length-bounds" +#endif + + union Block { + Block* next; + char data[0]; + }; + +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) +#pragma GCC diagnostic pop +#endif + + static constexpr size_t max_fixed_block_size = 65536; + + static size_t find_free_list_index(const size_t size) { + return size <= 8 ? 2 : bit_scan_reverse(size - 1); + } + + Arena pool; + + Block* free_lists[16] {}; + +public: + explicit ArenaWithFreeLists(const size_t initial_size = 4096, const size_t growth_factor = 2, + const size_t linear_growth_threshold = 128 * 1024 * 1024) + : pool {initial_size, growth_factor, linear_growth_threshold} {} + + char* alloc(const size_t size) { + if (size > max_fixed_block_size) { + return static_cast(Allocator::alloc(size)); + } + + const auto list_idx = find_free_list_index(size); + + if (auto& free_block_ptr = free_lists[list_idx]) { + ASAN_UNPOISON_MEMORY_REGION(free_block_ptr, std::max(size, sizeof(Block))); + + auto* const res = free_block_ptr->data; + free_block_ptr = free_block_ptr->next; + return res; + } + + return pool.alloc(1ULL << (list_idx + 1)); + } + + void free(char* ptr, const size_t size) { + if (size > max_fixed_block_size) { + Allocator::free(ptr, size); + return; + } + + const auto list_idx = find_free_list_index(size); + + auto& free_block_ptr = free_lists[list_idx]; + auto* const old_head = free_block_ptr; + free_block_ptr = reinterpret_cast(ptr); + free_block_ptr->next = old_head; + + ASAN_POISON_MEMORY_REGION(ptr, 1ULL << (list_idx + 1)); + } + + size_t allocated_bytes() const { return pool.size(); } +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/bit_helpers.h b/be/src/vec/common/bit_helpers.h new file mode 100644 index 00000000000000..13db63e4df5dc0 --- /dev/null +++ b/be/src/vec/common/bit_helpers.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/BitHelpers.h.h +// and modified by Doris + +#pragma once + +namespace doris::vectorized { + +template +inline uint32_t get_leading_zero_bits_unsafe(T x) { + assert(x != 0); + + if constexpr (sizeof(T) <= sizeof(unsigned int)) { + return __builtin_clz(x); + } else if constexpr (sizeof(T) <= sizeof(unsigned long int)) /// NOLINT + { + return __builtin_clzl(x); + } else { + return __builtin_clzll(x); + } +} + +template +inline uint32_t bit_scan_reverse(T x) { + return (std::max(sizeof(T), sizeof(unsigned int))) * 8 - 1 - + get_leading_zero_bits_unsafe(x); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/space_saving.h b/be/src/vec/common/space_saving.h new file mode 100644 index 00000000000000..18fa216228ddfb --- /dev/null +++ b/be/src/vec/common/space_saving.h @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/SpaceSaving.h.h +// and modified by Doris + +#pragma once + +#include + +#include "vec/common/arena_with_free_lists.h" +#include "vec/common/hash_table/hash_map.h" +#include "vec/common/string_buffer.hpp" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +template +struct SpaceSavingArena { + SpaceSavingArena() = default; + TKey emplace(const TKey& key) { return key; } + void free(const TKey& /*key*/) {} +}; + +template <> +struct SpaceSavingArena { + StringRef emplace(StringRef key) { + if (!key.data) { + return key; + } + + return copy_string_in_arena(arena, key); + } + + void free(StringRef key) { + if (key.data) { + arena.free(const_cast(key.data), key.size); + } + } + + template + inline StringRef copy_string_in_arena(Arena& arena, StringRef value) { + size_t value_size = value.size; + char* place_for_key = arena.alloc(value_size); + memcpy(reinterpret_cast(place_for_key), reinterpret_cast(value.data), + value_size); + StringRef result {place_for_key, value_size}; + + return result; + } + +private: + ArenaWithFreeLists arena; +}; + +template > +class SpaceSaving { +private: + // This function calculates the next size for the alpha map based on the given value `x`. + // It uses `alpha_map_elements_per_counter` to estimate the required number of elements. + constexpr uint64_t next_alpha_size(uint64_t x) { + constexpr uint64_t alpha_map_elements_per_counter = 6; + return 1ULL << (sizeof(uint64_t) * 8 - + std::countl_zero(x * alpha_map_elements_per_counter)); + } + +public: + using Self = SpaceSaving; + + struct Counter { + Counter() = default; + + explicit Counter(const TKey& k, uint64_t c = 0, uint64_t e = 0, size_t h = 0) + : key(k), hash(h), count(c), error(e) {} + + void write(BufferWritable& wb) const { + write_binary(key, wb); + write_var_uint(count, wb); + write_var_uint(error, wb); + } + + void read(BufferReadable& rb) { + read_binary(key, rb); + read_var_uint(count, rb); + read_var_uint(error, rb); + } + + bool operator>(const Counter& b) const { + return (count > b.count) || (count == b.count && error < b.error); + } + + TKey key; + size_t slot = 0; + size_t hash = 0; + uint64_t count = 0; + uint64_t error = 0; + }; + + explicit SpaceSaving(size_t c = 10) : alpha_map(next_alpha_size(c)), m_capacity(c) {} + + ~SpaceSaving() { destroy_elements(); } + + size_t size() const { return counter_list.size(); } + + size_t capacity() const { return m_capacity; } + + void clear() { return destroy_elements(); } + + void resize(size_t new_capacity) { + counter_list.reserve(new_capacity); + alpha_map.resize(next_alpha_size(new_capacity)); + m_capacity = new_capacity; + } + + // Inserts a new element or updates the count of an existing element. + // If the element exists, the count and error are increased. + // If the element doesn't exist and the capacity is not full, it inserts the new element. + // If the capacity is full, it replaces the element with the smallest count and inserts the new one. + void insert(const TKey& key, uint64_t increment = 1, uint64_t error = 0) { + auto hash = counter_map.hash(key); + + if (auto* counter = find_counter(key, hash); counter) { + counter->count += increment; + counter->error += error; + percolate(counter); + return; + } + + if (UNLIKELY(size() < capacity())) { + push(std::make_unique(arena.emplace(key), increment, error, hash)); + return; + } + + auto& min = counter_list.back(); + if (increment > min->count) { + destroy_last_element(); + push(std::make_unique(arena.emplace(key), increment, error, hash)); + return; + } + + const size_t alpha_mask = alpha_map.size() - 1; + auto& alpha = alpha_map[hash & alpha_mask]; + if (alpha + increment < min->count) { + alpha += increment; + return; + } + + alpha_map[min->hash & alpha_mask] = min->count; + destroy_last_element(); + + push(std::make_unique(arena.emplace(key), alpha + increment, alpha + error, hash)); + } + + // Merges another `SpaceSaving` object into the current one. Updates counts and errors of elements. + // If the other object is full, it adds its elements to the current list and maintains sorting. + void merge(const Self& rhs) { + if (!rhs.size()) { + return; + } + + uint64_t m1 = 0; + uint64_t m2 = 0; + + if (size() == capacity()) { + m1 = counter_list.back()->count; + } + + if (rhs.size() == rhs.capacity()) { + m2 = rhs.counter_list.back()->count; + } + + if (m2 > 0) { + for (auto& counter : counter_list) { + counter->count += m2; + counter->error += m2; + } + } + + for (auto& counter : boost::adaptors::reverse(rhs.counter_list)) { + size_t hash = counter_map.hash(counter->key); + if (auto* current = find_counter(counter->key, hash)) { + current->count += (counter->count - m2); + current->error += (counter->error - m2); + } else { + counter_list.push_back(std::make_unique(arena.emplace(counter->key), + counter->count + m1, + counter->error + m1, hash)); + } + } + + std::sort(counter_list.begin(), counter_list.end(), + [](const auto& l, const auto& r) { return *l > *r; }); + + if (counter_list.size() > m_capacity) { + for (size_t i = m_capacity; i < counter_list.size(); ++i) { + arena.free(counter_list[i]->key); + } + counter_list.resize(m_capacity); + } + + for (size_t i = 0; i < counter_list.size(); ++i) { + counter_list[i]->slot = i; + } + rebuild_counter_map(); + } + + // Retrieves the top-k counters, sorted by their count and error values. + std::vector top_k(size_t k) const { + std::vector res; + for (auto& counter : counter_list) { + res.push_back(*counter); + if (res.size() == k) { + break; + } + } + return res; + } + + void write(BufferWritable& wb) const { + write_var_uint(size(), wb); + for (auto& counter : counter_list) { + counter->write(wb); + } + + write_var_uint(alpha_map.size(), wb); + for (auto alpha : alpha_map) { + write_var_uint(alpha, wb); + } + } + + void read(BufferReadable& rb) { + destroy_elements(); + size_t count = 0; + read_var_uint(count, rb); + + for (size_t i = 0; i < count; ++i) { + std::unique_ptr counter = std::make_unique(); + counter->read(rb); + counter->hash = counter_map.hash(counter->key); + push(std::move(counter)); + } + + read_alpha_map(rb); + } + + // Reads the alpha map data from the provided readable buffer. + void read_alpha_map(BufferReadable& rb) { + size_t alpha_size = 0; + read_var_uint(alpha_size, rb); + for (size_t i = 0; i < alpha_size; ++i) { + uint64_t alpha = 0; + read_var_uint(alpha, rb); + alpha_map.push_back(alpha); + } + } + +protected: + void push(std::unique_ptr counter) { + counter->slot = counter_list.size(); + auto* ptr = counter.get(); + counter_list.push_back(std::move(counter)); + counter_map[ptr->key] = ptr; + percolate(ptr); + } + + void percolate(Counter* counter) { + while (counter->slot > 0) { + auto& next = counter_list[counter->slot - 1]; + if (*counter > *next) { + std::swap(next->slot, counter->slot); + std::swap(counter_list[next->slot], counter_list[counter->slot]); + } else { + break; + } + } + } + +private: + void destroy_elements() { + for (auto& counter : counter_list) { + arena.free(counter->key); + } + + counter_map.clear(); + counter_list.clear(); + alpha_map.clear(); + } + + void destroy_last_element() { + auto& last_element = counter_list.back(); + counter_map.erase(last_element->key); + arena.free(last_element->key); + counter_list.pop_back(); + + ++removed_keys; + if (removed_keys * 2 > counter_map.size()) { + rebuild_counter_map(); + } + } + + Counter* find_counter(const TKey& key, size_t hash) { + auto it = counter_map.find(key, hash); + if (it == counter_map.end()) { + return nullptr; + } + + return it->second; + } + + void rebuild_counter_map() { + removed_keys = 0; + counter_map.clear(); + for (auto& counter : counter_list) { + counter_map[counter->key] = counter.get(); + } + } + + using CounterMap = flat_hash_map; + + CounterMap counter_map; + std::vector> counter_list; + std::vector alpha_map; + SpaceSavingArena arena; + size_t m_capacity = 0; + size_t removed_keys = 0; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index ef2bbcb29964aa..2803fe16f02db6 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -139,6 +139,14 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, child_expr_name.emplace_back(_input_exprs_ctx->root()->expr_name()); } + std::vector> column_infos; + for (const auto& expr_ctx : _input_exprs_ctxs) { + const auto& root = expr_ctx->root(); + if (!root->expr_name().empty()) { + column_infos.emplace_back(root->expr_name(), root->is_constant()); + } + } + const DataTypes& argument_types = _real_argument_types.empty() ? tmp_argument_types : _real_argument_types; @@ -201,11 +209,15 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, _function = AggregateFunctionSimpleFactory::instance().get( _fn.name.function_name, argument_types, AggregateFunctionSimpleFactory::result_nullable_by_foreach(_data_type), - state->be_exec_version(), {.enable_decimal256 = state->enable_decimal256()}); + state->be_exec_version(), + {.enable_decimal256 = state->enable_decimal256(), + .column_infos = std::move(column_infos)}); } else { _function = AggregateFunctionSimpleFactory::instance().get( _fn.name.function_name, argument_types, _data_type->is_nullable(), - state->be_exec_version(), {.enable_decimal256 = state->enable_decimal256()}); + state->be_exec_version(), + {.enable_decimal256 = state->enable_decimal256(), + .column_infos = std::move(column_infos)}); } } if (_function == nullptr) { diff --git a/be/test/common/space_saving_test.cpp b/be/test/common/space_saving_test.cpp new file mode 100644 index 00000000000000..6b390017997982 --- /dev/null +++ b/be/test/common/space_saving_test.cpp @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/common/space_saving.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "vec/common/string_ref.h" + +namespace doris::vectorized { + +class SapceSavingTest : public testing::Test { +public: + void SetUp() override {} + void TearDown() override {} + + SapceSavingTest() = default; + ~SapceSavingTest() override = default; +}; + +int getDaySeed() { + std::time_t now = std::time(nullptr); + std::tm* localTime = std::localtime(&now); + localTime->tm_sec = 0; + localTime->tm_min = 0; + localTime->tm_hour = 0; + + return static_cast(std::mktime(localTime) / (60 * 60 * 24)); +} + +std::string generateRandomIP() { + std::string part1 = "127"; + std::string part2 = "0"; + std::string part3 = "0"; + std::string part4 = std::to_string(rand() % 256); + + return part1 + "." + part2 + "." + part3 + "." + part4; +} + +int32_t generateRandomNumber() { + return rand() % 256; +} + +TEST_F(SapceSavingTest, test_space_saving_ip) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(256); + std::unordered_map count_map; + + std::vector datas; + for (int32_t i = 0; i < 100000; ++i) { + datas.emplace_back(generateRandomIP()); + } + + for (auto& data : datas) { + StringRef ref(data); + space_saving.insert(ref); + count_map[ref]++; + } + + auto counts = space_saving.top_k(256); + int32_t i = 0; + int32_t j = 0; + for (auto& iter : counts) { + StringRef ref(iter.key); + EXPECT_EQ(iter.count, count_map[ref]); + i += iter.count; + j += count_map[ref]; + } + EXPECT_EQ(i, j); +} + +TEST_F(SapceSavingTest, test_space_saving_number) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(256); + std::unordered_map count_map; + + std::vector datas; + for (int32_t i = 0; i < 100000; ++i) { + datas.emplace_back(generateRandomNumber()); + } + + for (auto& data : datas) { + space_saving.insert(data); + count_map[data]++; + } + + auto counts = space_saving.top_k(256); + int32_t i = 0; + int32_t j = 0; + for (auto& iter : counts) { + EXPECT_EQ(iter.count, count_map[iter.key]); + i += iter.count; + j += count_map[iter.key]; + } + EXPECT_EQ(i, j); +} + +TEST_F(SapceSavingTest, test_space_saving_merge) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(256); + std::unordered_map count_map; + + // merge1 + std::vector datas1; + { + SpaceSaving space_saving1(256); + std::unordered_map count_map1; + for (int32_t i = 0; i < 100000; ++i) { + datas1.emplace_back(generateRandomIP()); + } + + for (auto& data : datas1) { + StringRef ref(data); + space_saving1.insert(ref); + count_map1[ref]++; + } + + space_saving.merge(space_saving1); + for (auto& iter1 : count_map1) { + auto iter = count_map.find(iter1.first); + if (iter != count_map.end()) { + iter->second += iter1.second; + } else { + count_map[iter1.first] = iter1.second; + } + } + } + + // merge2 + std::vector datas2; + { + SpaceSaving space_saving1(256); + std::unordered_map count_map1; + for (int32_t i = 0; i < 100000; ++i) { + datas2.emplace_back(generateRandomIP()); + } + + for (auto& data : datas2) { + StringRef ref(data); + space_saving1.insert(ref); + count_map1[ref]++; + } + + space_saving.merge(space_saving1); + for (auto& iter1 : count_map1) { + auto iter = count_map.find(iter1.first); + if (iter != count_map.end()) { + iter->second += iter1.second; + } else { + count_map[iter1.first] = iter1.second; + } + } + } + + auto counts = space_saving.top_k(256); + int32_t i = 0; + int32_t j = 0; + for (auto& iter : counts) { + StringRef ref(iter.key); + EXPECT_EQ(iter.count, count_map[ref]); + i += iter.count; + j += count_map[ref]; + } + EXPECT_EQ(i, j); +} + +// Test inserting beyond capacity +TEST_F(SapceSavingTest, test_space_saving_exceed_capacity) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(5); // Set small capacity + for (int32_t i = 1; i <= 10; ++i) { + space_saving.insert(i); + } + + auto counts = space_saving.top_k(5); + EXPECT_EQ(counts.size(), 5); + EXPECT_LE(counts[4].count, counts[3].count); // Ensure it's sorted +} + +// Test merging two SpaceSaving instances +TEST_F(SapceSavingTest, test_space_saving_merge_behavior) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving1(10); + SpaceSaving space_saving2(10); + + for (int32_t i = 1; i <= 20; ++i) { + space_saving1.insert(i, i); // Increment with value + } + + for (int32_t i = 11; i <= 30; ++i) { + space_saving2.insert(i, i); // Increment with value + } + + space_saving1.merge(space_saving2); + auto counts = space_saving1.top_k(10); + + EXPECT_EQ(counts.size(), 10); + EXPECT_GE(counts[0].count, counts[1].count); // Ensure sorted +} + +// Test that top_k returns elements in correct order +TEST_F(SapceSavingTest, test_space_saving_top_k_order) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(10); + for (int32_t i = 1; i <= 100; ++i) { + space_saving.insert(i, i); // Insert with increment equal to value + } + + auto counts = space_saving.top_k(10); + EXPECT_EQ(counts.size(), 10); + + // Check if the counts are in descending order + for (size_t i = 0; i < counts.size() - 1; ++i) { + EXPECT_GE(counts[i].count, counts[i + 1].count); + } +} + +// Test that the merging does not lose counts +TEST_F(SapceSavingTest, test_space_saving_merge_counts) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving1(10); + SpaceSaving space_saving2(10); + + for (int32_t i = 1; i <= 5; ++i) { + space_saving1.insert(i, i * 2); // Insert counts + } + + for (int32_t i = 1; i <= 5; ++i) { + space_saving2.insert(i + 5, i * 3); // Insert counts + } + + space_saving1.merge(space_saving2); + auto counts = space_saving1.top_k(10); + + // Validate counts + EXPECT_EQ(counts.size(), 10); + for (size_t i = 0; i < counts.size(); ++i) { + EXPECT_GE(counts[i].count, 0); // Ensure all counts are non-negative + } +} + +// Test with string keys and check behavior +TEST_F(SapceSavingTest, test_space_saving_string_keys) { + int seed = getDaySeed(); + std::srand(seed); + + SpaceSaving space_saving(10); + std::unordered_map count_map; + + std::vector keys = {"apple", "banana", "orange", "grape", "kiwi"}; + for (const auto& key : keys) { + space_saving.insert(StringRef(key), 1); + count_map[key]++; + } + + auto counts = space_saving.top_k(5); + EXPECT_EQ(counts.size(), 5); + + for (const auto& count : counts) { + EXPECT_EQ(count_map[count.key.to_string()], count.count); + } +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java index be4b300f96cebc..22534930cbad35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue; +import org.apache.doris.nereids.trees.expressions.functions.agg.ApproxTopK; import org.apache.doris.nereids.trees.expressions.functions.agg.ArrayAgg; import org.apache.doris.nereids.trees.expressions.functions.agg.Avg; import org.apache.doris.nereids.trees.expressions.functions.agg.AvgWeighted; @@ -95,6 +96,7 @@ public class BuiltinAggregateFunctions implements FunctionHelper { public final List aggregateFunctions = ImmutableList.of( agg(AnyValue.class, "any", "any_value"), + agg(ApproxTopK.class, "approx_top_k"), agg(ArrayAgg.class, "array_agg"), agg(Avg.class, "avg"), agg(AvgWeighted.class, "avg_weighted"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ApproxTopK.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ApproxTopK.java new file mode 100644 index 00000000000000..56f580efdec61f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ApproxTopK.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.agg; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.types.coercion.AnyDataType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * AggregateFunction 'approx_top_k'. This class is generated by GenerateFunction. + */ +public class ApproxTopK extends NullableAggregateFunction + implements ExplicitlyCastableSignature { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(StringType.INSTANCE) + .varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) + ); + + public ApproxTopK(Expression... varArgs) { + this(false, varArgs); + } + + public ApproxTopK(boolean distinct, Expression... varArgs) { + this(distinct, false, varArgs); + } + + public ApproxTopK(boolean distinct, boolean alwaysNullable, Expression... varArgs) { + super("approx_top_k", distinct, alwaysNullable, varArgs); + } + + @Override + public void checkLegalityBeforeTypeCoercion() { + if (arity() < 1) { + throw new AnalysisException( + "Function requires at least 1 parameter: " + this.toSql()); + } + } + + @Override + public ApproxTopK withDistinctAndChildren(boolean distinct, List children) { + Preconditions.checkArgument(children.size() >= 1); + return new ApproxTopK(distinct, alwaysNullable, children.toArray(new Expression[0])); + } + + @Override + public NullableAggregateFunction withAlwaysNullable(boolean alwaysNullable) { + return new ApproxTopK(distinct, alwaysNullable, children.toArray(new Expression[0])); + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitApproxTopK(this, context); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java index 4e7870609e1c10..3a93a2164bff4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue; +import org.apache.doris.nereids.trees.expressions.functions.agg.ApproxTopK; import org.apache.doris.nereids.trees.expressions.functions.agg.ArrayAgg; import org.apache.doris.nereids.trees.expressions.functions.agg.Avg; import org.apache.doris.nereids.trees.expressions.functions.agg.AvgWeighted; @@ -359,4 +360,8 @@ default R visitJavaUdaf(JavaUdaf javaUdaf, C context) { return visitAggregateFunction(javaUdaf, context); } + default R visitApproxTopK(ApproxTopK approxTopK, C context) { + return visitNullableAggregateFunction(approxTopK, context); + } + } diff --git a/regression-test/data/inverted_index_p0/test_index_approx_top_k.out b/regression-test/data/inverted_index_p0/test_index_approx_top_k.out new file mode 100644 index 00000000000000..0ed4fc0879eaf0 --- /dev/null +++ b/regression-test/data/inverted_index_p0/test_index_approx_top_k.out @@ -0,0 +1,133 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +141.78.0.0 67 +47.135.0.0 62 +12.0.0.0 54 +47.0.0.0 44 +23.0.0.0 40 +44.135.0.0 37 +8.0.0.0 31 +3.0.0.0 29 +45.135.0.0 29 +65.0.0.0 27 + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +47.0.0.0 304 0 41 +44.135.0.0 304 0 25 +141.78.0.0 304 0 24 +3.0.0.0 304 0 20 +55.0.0.0 200 985 12 +141.78.0.0 200 169 8 +247.37.0.0 304 0 8 +46.0.0.0 304 0 6 +8.0.0.0 304 0 6 +41.0.0.0 304 0 5 + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +{"clientip":"47.0.0.0","status":"304","size":"0","count":"41"} +{"clientip":"44.135.0.0","status":"304","size":"0","count":"25"} +{"clientip":"141.78.0.0","status":"304","size":"0","count":"24"} +{"clientip":"3.0.0.0","status":"304","size":"0","count":"20"} +{"clientip":"55.0.0.0","status":"200","size":"985","count":"12"} +{"clientip":"247.37.0.0","status":"304","size":"0","count":"8"} +{"clientip":"141.78.0.0","status":"200","size":"169","count":"8"} +{"clientip":"8.0.0.0","status":"304","size":"0","count":"6"} +{"clientip":"46.0.0.0","status":"304","size":"0","count":"6"} +{"clientip":"41.0.0.0","status":"304","size":"0","count":"5"} + +-- !sql -- +141.78.0.0 67 +47.135.0.0 62 +12.0.0.0 54 +47.0.0.0 44 +23.0.0.0 40 +44.135.0.0 37 +8.0.0.0 31 +3.0.0.0 29 +45.135.0.0 29 +65.0.0.0 27 + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +[\n {"clientip":"141.78.0.0","count":"67"},\n {"clientip":"47.135.0.0","count":"62"},\n {"clientip":"12.0.0.0","count":"54"},\n {"clientip":"47.0.0.0","count":"44"},\n {"clientip":"23.0.0.0","count":"40"},\n {"clientip":"44.135.0.0","count":"37"},\n {"clientip":"8.0.0.0","count":"31"},\n {"clientip":"45.135.0.0","count":"29"},\n {"clientip":"3.0.0.0","count":"29"},\n {"clientip":"65.0.0.0","count":"27"}\n] + +-- !sql -- +47.0.0.0 304 0 41 +44.135.0.0 304 0 25 +141.78.0.0 304 0 24 +3.0.0.0 304 0 20 +55.0.0.0 200 985 12 +141.78.0.0 200 169 8 +247.37.0.0 304 0 8 +46.0.0.0 304 0 6 +8.0.0.0 304 0 6 +41.0.0.0 304 0 5 + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +[\n {"clientip":"47.0.0.0","status":"304","size":"0","count":"41"},\n {"clientip":"44.135.0.0","status":"304","size":"0","count":"25"},\n {"clientip":"141.78.0.0","status":"304","size":"0","count":"24"},\n {"clientip":"3.0.0.0","status":"304","size":"0","count":"20"},\n {"clientip":"55.0.0.0","status":"200","size":"985","count":"12"},\n {"clientip":"247.37.0.0","status":"304","size":"0","count":"8"},\n {"clientip":"141.78.0.0","status":"200","size":"169","count":"8"},\n {"clientip":"8.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"46.0.0.0","status":"304","size":"0","count":"6"},\n {"clientip":"41.0.0.0","status":"304","size":"0","count":"5"}\n] + +-- !sql -- +{"clientip":"47.0.0.0","status":"304","size":"0","count":"41"} +{"clientip":"44.135.0.0","status":"304","size":"0","count":"25"} +{"clientip":"141.78.0.0","status":"304","size":"0","count":"24"} +{"clientip":"3.0.0.0","status":"304","size":"0","count":"20"} +{"clientip":"55.0.0.0","status":"200","size":"985","count":"12"} +{"clientip":"247.37.0.0","status":"304","size":"0","count":"8"} +{"clientip":"141.78.0.0","status":"200","size":"169","count":"8"} +{"clientip":"8.0.0.0","status":"304","size":"0","count":"6"} +{"clientip":"46.0.0.0","status":"304","size":"0","count":"6"} +{"clientip":"41.0.0.0","status":"304","size":"0","count":"5"} + diff --git a/regression-test/suites/inverted_index_p0/test_index_approx_top_k.groovy b/regression-test/suites/inverted_index_p0/test_index_approx_top_k.groovy new file mode 100644 index 00000000000000..638d2316c463a0 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_index_approx_top_k.groovy @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_index_approx_top_k", "p0"){ + def tableName = "test_index_approx_top_k" + + sql "DROP TABLE IF EXISTS ${tableName}" + + def create_table = {table_name -> + sql """ + CREATE TABLE ${table_name} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` text NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int NULL COMMENT "", + `size` int NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + if (expected_succ_rows >= 0) { + assertEquals(json.NumberLoadedRows, expected_succ_rows) + } else { + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + try { + create_table(tableName) + + load_httplogs_data.call(tableName, 'test_index_approx_top_k', 'true', 'json', 'documents-1000.json') + + sql "sync" + + sql """ set enable_common_expr_pushdown = true """ + + sql """ set debug_skip_fold_constant = true; """ + qt_sql """ select clientip, count(*) as count from ${tableName} group by clientip order by count desc, clientip asc limit 10; """ + qt_sql """ select approx_top_k(clientip) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 10) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 10, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 5 + 5, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, abs(-10), 300) from ${tableName}; """ + + qt_sql """ select clientip, status, size, count(*) as count from ${tableName} group by clientip, status, size order by count desc, clientip asc limit 10; """ + qt_sql """ select approx_top_k(clientip, status, size) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 10) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 10, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 5 + 5, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, abs(-10), 300) from ${tableName}; """ + + def result1 = "fail" + try { + drop_result = sql " select approx_top_k(clientip, -10, 300) from ${tableName}; " + result1 = 'success' + } catch(Exception ex) { + logger.info("error msg: " + ex) + } + assertEquals(result1, 'fail') + + qt_sql """ + WITH tmp AS ( + SELECT approx_top_k(clientip, status, size, 10, 300) AS json_output FROM ${tableName} + ) + SELECT + e1 + FROM + tmp + LATERAL VIEW explode_json_array_json(json_output) tmp1 AS e1; + """ + + sql """ set debug_skip_fold_constant = true; """ + qt_sql """ select clientip, count(*) as count from ${tableName} group by clientip order by count desc, clientip asc limit 10; """ + qt_sql """ select approx_top_k(clientip) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 10) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 10, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, 5 + 5, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, abs(-10), 300) from ${tableName}; """ + + qt_sql """ select clientip, status, size, count(*) as count from ${tableName} group by clientip, status, size order by count desc, clientip asc limit 10; """ + qt_sql """ select approx_top_k(clientip, status, size) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 10) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 10, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, 5 + 5, 300) from ${tableName}; """ + qt_sql """ select approx_top_k(clientip, status, size, abs(-10), 300) from ${tableName}; """ + + def result2 = "fail" + try { + drop_result = sql " select approx_top_k(clientip, -10, 300) from ${tableName}; " + result2 = 'success' + } catch(Exception ex) { + logger.info("error msg: " + ex) + } + assertEquals(result2, 'fail') + + qt_sql """ + WITH tmp AS ( + SELECT approx_top_k(clientip, status, size, 10, 300) AS json_output FROM ${tableName} + ) + SELECT + e1 + FROM + tmp + LATERAL VIEW explode_json_array_json(json_output) tmp1 AS e1; + """ + } finally { + //try_sql("DROP TABLE IF EXISTS ${testTable}") + } +}