From 0c55b7cc139736fe8c0609e885596598ff05e7be Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Fri, 9 Oct 2020 23:29:23 +0800 Subject: [PATCH 1/5] topn --- be/src/common/daemon.cpp | 2 + be/src/exprs/CMakeLists.txt | 3 +- be/src/exprs/topn_function.cpp | 119 ++++++++ be/src/exprs/topn_function.h | 47 ++++ be/src/util/CMakeLists.txt | 1 + be/src/util/topn_counter.cpp | 138 ++++++++++ be/src/util/topn_counter.h | 182 ++++++++++++ be/test/exprs/CMakeLists.txt | 1 + be/test/exprs/topn_function_test.cpp | 259 ++++++++++++++++++ be/test/exprs/zipf_distribution.h | 120 ++++++++ .../sql-functions/aggregate-functions/topn.md | 61 +++++ .../sql-functions/aggregate-functions/topn.md | 60 ++++ .../doris/analysis/FunctionCallExpr.java | 25 ++ .../org/apache/doris/catalog/FunctionSet.java | 84 ++++++ gensrc/proto/olap_common.proto | 11 + 15 files changed, 1112 insertions(+), 1 deletion(-) create mode 100644 be/src/exprs/topn_function.cpp create mode 100644 be/src/exprs/topn_function.h create mode 100644 be/src/util/topn_counter.cpp create mode 100644 be/src/util/topn_counter.h create mode 100644 be/test/exprs/topn_function_test.cpp create mode 100644 be/test/exprs/zipf_distribution.h create mode 100644 docs/en/sql-reference/sql-functions/aggregate-functions/topn.md create mode 100644 docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index c05ef99146122f..e57d027a5c94b2 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -41,6 +41,7 @@ #include "exprs/string_functions.h" #include "exprs/time_operators.h" #include "exprs/timestamp_functions.h" +#include "exprs/topn_function.h" #include "exprs/utility_functions.h" #include "geo/geo_functions.h" #include "olap/options.h" @@ -261,6 +262,7 @@ void Daemon::init(int argc, char** argv, const std::vector& paths) { BitmapFunctions::init(); HllFunctions::init(); HashFunctions::init(); + TopNFunctions::init(); LOG(INFO) << CpuInfo::debug_string(); LOG(INFO) << DiskInfo::debug_string(); diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 91947245c1d36f..bc179e5a880a9d 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -65,4 +65,5 @@ add_library(Exprs new_agg_fn_evaluator.cc bitmap_function.cpp hll_function.cpp - grouping_sets_functions.cpp) + grouping_sets_functions.cpp + topn_function.cpp) diff --git a/be/src/exprs/topn_function.cpp b/be/src/exprs/topn_function.cpp new file mode 100644 index 00000000000000..5ea160f184f0eb --- /dev/null +++ b/be/src/exprs/topn_function.cpp @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/topn_function.h" +#include "util/topn_counter.h" +#include "util/slice.h" + +namespace doris { + +using doris_udf::AnyVal; + +void TopNFunctions::init() { +} + +void TopNFunctions::topn_init(FunctionContext* ctx, StringVal *dst) { + dst->is_null = false; + dst->len = sizeof(TopNCounter); + const AnyVal* space_expand_rate_val = ctx->get_constant_arg(2); + if (space_expand_rate_val != nullptr) { + int32_t space_expand_rate = reinterpret_cast(space_expand_rate_val)->val; + dst->ptr = (uint8_t *) new TopNCounter(space_expand_rate); + return; + } + dst->ptr = (uint8_t *) new TopNCounter(); +} + +template +void TopNFunctions::topn_update(FunctionContext*, const T& src, const IntVal& topn, StringVal* dst) { + if (src.is_null) { + return; + } + auto* dst_topn = reinterpret_cast(dst->ptr); + dst_topn->set_top_num(topn.val); + dst_topn->add_item(src); +} + +template +void TopNFunctions::topn_update(FunctionContext *, const T &src, const IntVal &topn, const IntVal &space_expand_rate, + StringVal *dst) { + if (src.is_null) { + return; + } + auto* dst_topn = reinterpret_cast(dst->ptr); + dst_topn->set_top_num(topn.val); + dst_topn->add_item(src); +} + +void TopNFunctions::topn_merge(FunctionContext* ctx, const StringVal &src, StringVal *dst) { + if (src.is_null) { + return; + } + auto* dst_topn = reinterpret_cast(dst->ptr); + dst_topn->merge(TopNCounter(Slice(src.ptr, src.len))); +} + +StringVal TopNFunctions::topn_serialize(FunctionContext *ctx, const StringVal &src) { + auto* src_topn = reinterpret_cast(src.ptr); + + std::string buffer; + src_topn->serialize(&buffer); + StringVal result(ctx, buffer.size()); + memcpy(result.ptr, buffer.data(), buffer.size()); + delete src_topn; + return result; +} + +StringVal TopNFunctions::topn_finalize(FunctionContext* ctx, const StringVal &src) { + auto* src_topn = reinterpret_cast(src.ptr); + std::string result_str; + src_topn->finalize(result_str); + + StringVal result(ctx, result_str.size()); + memcpy(result.ptr, result_str.data(), result_str.size()); + + delete src_topn; + return result; +} + +template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const SmallIntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const StringVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DecimalV2Val&, const IntVal&, StringVal*); + +template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const SmallIntVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const IntVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext *, const StringVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&, const IntVal&, const IntVal&, StringVal*); +template void TopNFunctions::topn_update(FunctionContext*, const DecimalV2Val&, const IntVal&, const IntVal&, StringVal*); + +} \ No newline at end of file diff --git a/be/src/exprs/topn_function.h b/be/src/exprs/topn_function.h new file mode 100644 index 00000000000000..b50700e8d033a0 --- /dev/null +++ b/be/src/exprs/topn_function.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H +#define DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H + +#include "udf/udf.h" + +namespace doris { + +class TopNFunctions { +public: + static void init(); + + static void topn_init(FunctionContext*, StringVal* dst); + + template + static void topn_update(FunctionContext*, const T& src, const IntVal& topn, StringVal* dst); + + template + static void topn_update(FunctionContext*, const T& src, const IntVal& topn, const IntVal& space_expand_rate, + StringVal* dst); + + static void topn_merge(FunctionContext*,const StringVal& src, StringVal* dst); + + static StringVal topn_serialize(FunctionContext* ctx, const StringVal& src); + + static StringVal topn_finalize(FunctionContext*, const StringVal& src); +}; + +} + +#endif //DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index adb21a66980f4f..ce1cd196b91bb1 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -102,6 +102,7 @@ set(UTIL_FILES brpc_stub_cache.cpp zlib.cpp pprof_utils.cpp + topn_counter.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/topn_counter.cpp b/be/src/util/topn_counter.cpp new file mode 100644 index 00000000000000..77941f453ea960 --- /dev/null +++ b/be/src/util/topn_counter.cpp @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "gen_cpp/olap_common.pb.h" +#include "topn_counter.h" +#include "slice.h" + +namespace doris { + +void TopNCounter::add_item(const std::string& item, uint64_t incrementCount) { + auto iter = _counter_map->find(item); + if (iter != _counter_map->end()) { + iter->second.add_count(incrementCount); + } else { + _counter_map->insert(std::make_pair(item, Counter(item, incrementCount))); + } + _ordered = false; +} + +void TopNCounter::serialize(std::string* buffer) { + sort_retain(_capacity); + PTopNCounter topn_counter; + topn_counter.set_top_num(_top_num); + topn_counter.set_space_expand_rate(_space_expand_rate); + for(std::vector::const_iterator it = _counter_vec->begin(); it != _counter_vec->end(); ++it) + { + PCounter* counter = topn_counter.add_counter(); + counter->set_item(it->get_item()); + counter->set_count(it->get_count()); + } + topn_counter.SerializeToString(buffer); +} + +bool TopNCounter::deserialize(const doris::Slice &src) { + PTopNCounter topn_counter; + if (!topn_counter.ParseFromArray(src.data, src.size)) { + LOG(WARNING) << "topn counter deserialize failed"; + return false; + } + + _space_expand_rate = topn_counter.space_expand_rate(); + set_top_num(topn_counter.top_num()); + for (int i = 0; i < topn_counter.counter_size(); ++i) { + const PCounter& counter = topn_counter.counter(i); + _counter_map->insert(std::make_pair(counter.item(), Counter(counter.item(), counter.count()))); + _counter_vec->emplace_back(counter.item(), counter.count()); + } + _ordered = true; + return true; +} + +void TopNCounter::sort_retain(uint32_t capacity) { + _counter_vec->clear(); + sort_retain(capacity, _counter_vec); + _ordered = true; +} + +void TopNCounter::sort_retain(uint32_t capacity, std::vector* sort_vec) { + for(std::unordered_map::const_iterator it = _counter_map->begin(); it != _counter_map->end(); ++it) { + sort_vec->emplace_back(it->second.get_item(), it->second.get_count()); + } + + std::sort(sort_vec->begin(), sort_vec->end(), TopNComparator()); + if (sort_vec->size() > capacity) { + for (uint32_t i = 0, n = sort_vec->size() - capacity; i < n; ++i) { + auto &counter = sort_vec->back(); + _counter_map->erase(counter.get_item()); + sort_vec->pop_back(); + } + } +} + +// Based on the parallel version of the Space Saving algorithm as described in: +// A parallel space saving algorithm for frequent items and the Hurwitz zeta distribution by Massimo Cafaro, et al. +void TopNCounter::merge(doris::TopNCounter &&other) { + if (other._counter_map->size() == 0) { + return; + } + + _space_expand_rate = other._space_expand_rate; + set_top_num(other._top_num); + bool this_full = _counter_map->size() >= _capacity; + bool another_full = other._counter_map->size() >= other._capacity; + + uint64_t m1 = this_full ? _counter_vec->back().get_count() : 0; + uint64_t m2 = another_full ? other._counter_vec->back().get_count() : 0; + + if (another_full == true) { + for (auto &entry : *(this->_counter_map)) { + entry.second.add_count(m2); + } + } + + for (auto &other_entry : *(other._counter_map)) { + auto itr = this->_counter_map->find(other_entry.first); + if (itr != _counter_map->end()) { + itr->second.add_count(other_entry.second.get_count() - m2); + } else { + this->_counter_map->insert(std::make_pair(other_entry.first, + Counter(other_entry.first,other_entry.second.get_count() + m1))); + } + } + _ordered = false; + sort_retain(_capacity); +} + +void TopNCounter::finalize(std::string& finalize_str) { + if (!_ordered) { + sort_retain(_top_num); + } + std::ostringstream oss; + uint32_t k = 0; + for (std::vector::const_iterator it = _counter_vec->begin(); it != _counter_vec->end() && k < _top_num; ++it, ++k) { + oss << it->get_item() << ":" << it->get_count() << ", "; + } + finalize_str = oss.str(); + // remove last ', ' char + if (finalize_str.size() > 0) { + finalize_str.erase(finalize_str.size()-2); + } +} + +} \ No newline at end of file diff --git a/be/src/util/topn_counter.h b/be/src/util/topn_counter.h new file mode 100644 index 00000000000000..f8dc584dc9d7b2 --- /dev/null +++ b/be/src/util/topn_counter.h @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_UTI_TOPN_COUNTER_H +#define DORIS_BE_SRC_UTI_TOPN_COUNTER_H + +#include +#include + +#include "common/logging.h" +#include "runtime/datetime_value.h" +#include "runtime/decimal_value.h" +#include "runtime/decimalv2_value.h" +#include "runtime/large_int_value.h" +#include "udf/udf.h" + +namespace doris { + +static const uint32_t DEFAULT_SPACE_EXPAND_RATE = 50; + +class Slice; + +class Counter { +public: + Counter() = default; + + Counter(const std::string& item, uint64_t count) : _item(item), _count(count) {} + + uint64_t get_count() const { + return _count; + } + + const std::string& get_item() const { + return _item; + } + + void add_count(uint64_t count) { + _count += count; + } + + bool operator == (const Counter& other) { + if(_item.compare(other._item) != 0) { + return false; + } + if (_count != other._count) { + return false; + } + return true; + } + +private: + std::string _item; + uint64_t _count; +}; + + +// Refer to TopNCounter.java in https://github.com/apache/kylin +// Based on the Space-Saving algorithm and the Stream-Summary data structure as described in: +// Efficient Computation of Frequent and Top-k Elements in Data Streams by Metwally, Agrawal, and Abbadi +class TopNCounter { +public: + TopNCounter(uint32_t space_expand_rate = DEFAULT_SPACE_EXPAND_RATE) : + _top_num(0), _space_expand_rate(space_expand_rate), _capacity(0), _ordered(false), + _counter_map(new std::unordered_map(_capacity)), + _counter_vec(new std::vector(_capacity)){} + + TopNCounter(const Slice& src) : + _top_num(0), _space_expand_rate(0), _capacity(0), _ordered(false), + _counter_map(new std::unordered_map(_capacity)), + _counter_vec(new std::vector(_capacity)) { + bool res = deserialize(src); + DCHECK(res); + } + + ~TopNCounter() { + delete _counter_map; + delete _counter_vec; + } + + template + void add_item(const T& item) { + add_item(item, 1); + } + + void add_item(const BooleanVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const TinyIntVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const SmallIntVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const IntVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const BigIntVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const FloatVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const DoubleVal& item, uint64_t incrementCount) { + add_item_numeric(item, incrementCount); + } + void add_item(const StringVal& item, uint64_t incrementCount) { + add_item(std::string((char*) item.ptr, item.len), incrementCount); + } + void add_item(const DateTimeVal& item, uint64_t incrementCount) { + char str[MAX_DTVALUE_STR_LEN]; + DateTimeValue::from_datetime_val(item).to_string(str); + add_item(std::string(str), incrementCount); + } + void add_item(const LargeIntVal& item, uint64_t incrementCount) { + add_item(LargeIntValue::to_string(item.val), incrementCount); + } + void add_item(const DecimalVal& item, uint64_t incrementCount) { + add_item(DecimalValue::from_decimal_val(item).to_string(), incrementCount); + } + void add_item(const DecimalV2Val& item, uint64_t incrementCount) { + add_item(DecimalV2Value::from_decimal_val(item).to_string(), incrementCount); + } + + template + void add_item_numeric(const T& item, uint64_t incrementCount) { + add_item(std::to_string(item.val), incrementCount); + } + + void add_item(const std::string& item, uint64_t incrementCount); + + void serialize(std::string* buffer); + + bool deserialize(const Slice& src); + + void merge(doris::TopNCounter&& other); + + // Sort counter by count value and record it in _counter_vec + void sort_retain(uint32_t capacity); + + void sort_retain(uint32_t capacity, std::vector* sort_vec); + + void finalize(std::string&); + + void set_top_num(uint32_t top_num) { + _top_num = top_num; + _capacity = top_num * _space_expand_rate; + } + +private: + uint32_t _top_num; + uint32_t _space_expand_rate; + uint64_t _capacity; + bool _ordered; + std::unordered_map* _counter_map; + std::vector* _counter_vec; +}; + +class TopNComparator +{ +public: + bool operator () (const Counter& s1, const Counter& s2) + { + return s1.get_count() > s2.get_count(); + } +}; +} + +#endif //DORIS_BE_SRC_UTI_TOPN_COUNTER_H diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt index fef817236478fa..8c393aa4ca4d0d 100644 --- a/be/test/exprs/CMakeLists.txt +++ b/be/test/exprs/CMakeLists.txt @@ -34,4 +34,5 @@ ADD_BE_TEST(hll_function_test) ADD_BE_TEST(encryption_functions_test) #ADD_BE_TEST(in-predicate-test) ADD_BE_TEST(math_functions_test) +ADD_BE_TEST(topn_function_test) diff --git a/be/test/exprs/topn_function_test.cpp b/be/test/exprs/topn_function_test.cpp new file mode 100644 index 00000000000000..89de53701e7adb --- /dev/null +++ b/be/test/exprs/topn_function_test.cpp @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/anyval_util.h" +#include "exprs/topn_function.h" +#include "util/topn_counter.h" +#include "testutil/function_utils.h" +#include "zipf_distribution.h" + +#include +#include + + +namespace doris { + +static const uint32_t TOPN_NUM = 100; +static const uint32_t TOTAL_RECORDS = 1000000; +static const uint32_t PARALLEL = 10; + +std::string gen_random(const int len) { + std::string possible_characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_int_distribution<> dist(0, possible_characters.size()-1); + + std::string rand_str(len, '\0'); + for(auto& dis: rand_str) { + dis = possible_characters[dist(generator)]; + } + return rand_str; +} + +class TopNFunctionsTest : public testing::Test { +public: + TopNFunctionsTest() = default; + + void SetUp() { + utils = new FunctionUtils(); + ctx = utils->get_fn_ctx(); + } + + void TearDown() { + delete utils; + } + +private: + FunctionUtils *utils; + FunctionContext *ctx; +}; + +void update_accuracy_map(const std::string& item, std::unordered_map& accuracy_map) { + if (accuracy_map.find(item) != accuracy_map.end()) { + ++accuracy_map[item]; + } else { + accuracy_map.insert(std::make_pair(item, 1)); + } +} + +void topn_single(FunctionContext* ctx, std::string& random_str, StringVal& dst, std::unordered_map& accuracy_map){ + TopNFunctions::topn_update(ctx, StringVal(((uint8_t*) random_str.data()), random_str.length()), TOPN_NUM, &dst); + update_accuracy_map(random_str, accuracy_map); +} + +void test_topn_accuracy(FunctionContext* ctx, int key_space, int space_expand_rate, double zipf_distribution_exponent) { + LOG(INFO) << "topn accuracy : " << "key space : " << key_space << " , space_expand_rate : " << space_expand_rate << + " , zf exponent : " << zipf_distribution_exponent; + std::unordered_map accuracy_map; + // prepare random data + std::vector random_strs(key_space); + for (uint32_t i = 0; i < key_space; ++i) { + random_strs[i] = gen_random(10); + } + + zipf_distribution zf(key_space, zipf_distribution_exponent); + std::random_device rd; + std::mt19937 gen(rd()); + + StringVal topn_column("placeholder"); + IntVal topn_num_column(TOPN_NUM); + IntVal space_expand_rate_column(space_expand_rate); + std::vector const_vals; + const_vals.push_back(&topn_column); + const_vals.push_back(&topn_num_column); + const_vals.push_back(&space_expand_rate_column); + ctx->impl()->set_constant_args(const_vals); + // Compute topN in parallel + StringVal dst; + TopNFunctions::topn_init(ctx, &dst); + + StringVal single_dst_str[PARALLEL]; + for (uint32_t i = 0; i < PARALLEL; ++i) { + TopNFunctions::topn_init(ctx, &single_dst_str[i]); + } + + std::random_device random_rd; + std::mt19937 random_gen(random_rd()); + std::uniform_int_distribution<> dist(0, PARALLEL-1); + for (uint32_t i = 0; i < TOTAL_RECORDS; ++i) { + // generate zipf_distribution + uint32_t index = zf(gen); + // choose one single topn to update + topn_single(ctx, random_strs[index], single_dst_str[dist(random_gen)], accuracy_map); + } + + for (uint32_t i = 0; i < PARALLEL; ++i) { + StringVal serialized_str = TopNFunctions::topn_serialize(ctx, single_dst_str[i]); + TopNFunctions::topn_merge(ctx, serialized_str, &dst); + } + + // get accuracy result + std::vector accuracy_sort_vec; + for(std::unordered_map::const_iterator it = accuracy_map.begin(); it != accuracy_map.end(); ++it) { + accuracy_sort_vec.emplace_back(it->first, it->second); + } + std::sort(accuracy_sort_vec.begin(), accuracy_sort_vec.end(), TopNComparator()); + + // get topn result + TopNCounter* topn_dst = reinterpret_cast(dst.ptr); + std::vector topn_sort_vec; + topn_dst->sort_retain(TOPN_NUM, &topn_sort_vec); + + uint32_t error = 0; + for (uint32_t i = 0; i < TOPN_NUM; ++i) { + Counter& accuracy_counter = accuracy_sort_vec[i]; + Counter& topn_counter = topn_sort_vec[i]; + if (accuracy_counter.get_count() != topn_counter.get_count()) { + ++error; + LOG(INFO) << "Failed"; + LOG(INFO) << "accuracy counter : (" << accuracy_counter.get_item() << ", " << accuracy_counter.get_count() << ")"; + LOG(INFO) << "topn counter : (" << topn_counter.get_item() << ", " << topn_counter.get_count() << ")"; + } + } + LOG(INFO) << "Total errors : " << error; + TopNFunctions::topn_finalize(ctx, dst); +} + +TEST_F(TopNFunctionsTest, topn_accuracy) { + std::vector key_space_vec ({1000, 10000, 100000, 500000}); + std::vector space_expand_rate_vec({20, 50, 100}); + std::vector zipf_distribution_exponent_vec({0.5, 0.6, 1.0}); + for (auto ket_space : key_space_vec) { + for (auto space_expand_rate : space_expand_rate_vec) { + for (auto zipf_distribution_exponent : zipf_distribution_exponent_vec) { + test_topn_accuracy(ctx, ket_space, space_expand_rate, zipf_distribution_exponent); + } + } + } + +} + +TEST_F(TopNFunctionsTest, topn_update) { + StringVal dst; + TopNFunctions::topn_init(ctx, &dst); + StringVal src1("a"); + for (uint32_t i = 0; i < 10; ++i) { + TopNFunctions::topn_update(ctx, src1, 2, &dst); + } + + StringVal src2("b"); + TopNFunctions::topn_update(ctx, src2, 2, &dst); + TopNFunctions::topn_update(ctx, src2, 2, &dst); + + StringVal src3("c"); + TopNFunctions::topn_update(ctx, src3, 2, &dst); + + StringVal result = TopNFunctions::topn_finalize(ctx, dst); + StringVal expected("a:10, b:2"); + ASSERT_EQ(expected, result); +} + +TEST_F(TopNFunctionsTest, topn_merge) { + StringVal dst1; + TopNFunctions::topn_init(ctx, &dst1); + StringVal dst2; + TopNFunctions::topn_init(ctx, &dst2); + + StringVal src1("a"); + for (uint32_t i = 0; i < 10; ++i) { + TopNFunctions::topn_update(ctx, src1, 2, &dst1); + TopNFunctions::topn_update(ctx, src1, 2, &dst2); + } + StringVal src2("b"); + for (uint32_t i = 0; i < 8; ++i) { + TopNFunctions::topn_update(ctx, src2, 2, &dst1); + } + StringVal src3("c"); + for (uint32_t i = 0; i < 6; ++i) { + TopNFunctions::topn_update(ctx, src3, 2, &dst2); + } + + StringVal val1 = TopNFunctions::topn_serialize(ctx, dst1); + StringVal val2 = TopNFunctions::topn_serialize(ctx, dst2); + + StringVal dst; + TopNFunctions::topn_init(ctx, &dst); + TopNFunctions::topn_merge(ctx, val1, &dst); + TopNFunctions::topn_merge(ctx, val2, &dst); + StringVal result = TopNFunctions::topn_finalize(ctx, dst); + StringVal expected("a:20, b:8"); + ASSERT_EQ(expected, result); +} + +TEST_F(TopNFunctionsTest, test_null_value) { + StringVal dst1; + TopNFunctions::topn_init(ctx, &dst1); + + for (uint32_t i = 0; i < 10; ++i) { + TopNFunctions::topn_update(ctx, IntVal::null(), 2, &dst1); + } + StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1); + + StringVal dst2; + TopNFunctions::topn_init(ctx, &dst2); + TopNFunctions::topn_merge(ctx, serialized, &dst2); + StringVal result = TopNFunctions::topn_finalize(ctx, dst2); + StringVal expected(""); + ASSERT_EQ(expected, result); +} + +TEST_F(TopNFunctionsTest, test_date_type) { + StringVal dst1; + TopNFunctions::topn_init(ctx, &dst1); + + DateTimeValue dt(20201001000000); + doris_udf::DateTimeVal dt_val; + dt.to_datetime_val(&dt_val); +for (uint32_t i = 0; i < 10; ++i) { + TopNFunctions::topn_update(ctx, dt_val, 1, &dst1); + } + StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1); + + StringVal dst2; + TopNFunctions::topn_init(ctx, &dst2); + TopNFunctions::topn_merge(ctx, serialized, &dst2); + StringVal result = TopNFunctions::topn_finalize(ctx, dst2); + StringVal expected("2020-10-01 00:00:00:10"); + ASSERT_EQ(expected, result); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/be/test/exprs/zipf_distribution.h b/be/test/exprs/zipf_distribution.h new file mode 100644 index 00000000000000..8ae2fda384f54e --- /dev/null +++ b/be/test/exprs/zipf_distribution.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +/** Refer to https://stackoverflow.com/questions/9983239/how-to-generate-zipf-distributed-numbers-efficiently + * Zipf-like random distribution. + * + * "Rejection-inversion to generate variates from monotone discrete + * distributions", Wolfgang Hörmann and Gerhard Derflinger + * ACM TOMACS 6.3 (1996): 169-184 + */ +template +class zipf_distribution +{ +public: + typedef RealType input_type; + typedef IntType result_type; + + static_assert(std::numeric_limits::is_integer, ""); + static_assert(!std::numeric_limits::is_integer, ""); + + zipf_distribution(const IntType n=std::numeric_limits::max(), + const RealType q=1.0) + : n(n) + , q(q) + , H_x1(H(1.5) - 1.0) + , H_n(H(n + 0.5)) + , dist(H_x1, H_n) + {} + + IntType operator()(std::mt19937& rng) + { + while (true) { + const RealType u = dist(rng); + const RealType x = H_inv(u); + const IntType k = clamp(std::round(x), 1, n); + if (u >= H(k + 0.5) - h(k)) { + return k; + } + } + } + +private: + /** Clamp x to [min, max]. */ + template + static constexpr T clamp(const T x, const T min, const T max) + { + return std::max(min, std::min(max, x)); + } + + /** exp(x) - 1 / x */ + static double + expxm1bx(const double x) + { + return (std::abs(x) > epsilon) + ? std::expm1(x) / x + : (1.0 + x/2.0 * (1.0 + x/3.0 * (1.0 + x/4.0))); + } + + /** H(x) = log(x) if q == 1, (x^(1-q) - 1)/(1 - q) otherwise. + * H(x) is an integral of h(x). + * + * Note the numerator is one less than in the paper order to work with all + * positive q. + */ + const RealType H(const RealType x) + { + const RealType log_x = std::log(x); + return expxm1bx((1.0 - q) * log_x) * log_x; + } + + /** log(1 + x) / x */ + static RealType + log1pxbx(const RealType x) + { + return (std::abs(x) > epsilon) + ? std::log1p(x) / x + : 1.0 - x * ((1/2.0) - x * ((1/3.0) - x * (1/4.0))); + } + + /** The inverse function of H(x) */ + const RealType H_inv(const RealType x) + { + const RealType t = std::max(-1.0, x * (1.0 - q)); + return std::exp(log1pxbx(t) * x); + } + + /** That hat function h(x) = 1 / (x ^ q) */ + const RealType h(const RealType x) + { + return std::exp(-q * std::log(x)); + } + + static constexpr RealType epsilon = 1e-8; + + IntType n; ///< Number of elements + RealType q; ///< Exponent + RealType H_x1; ///< H(x_1) + RealType H_n; ///< H(n) + std::uniform_real_distribution dist; ///< [H(x_1), H(n)] +}; \ No newline at end of file diff --git a/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md new file mode 100644 index 00000000000000..bb29738b7db54c --- /dev/null +++ b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md @@ -0,0 +1,61 @@ +--- +{ + "title": "TOPN", + "language": "zh-CN" +} +--- + + + +# TOPN +## description +### Syntax + +`topn(expr, INT top_num[, INT space_expand_rate])` + +The topn function uses the Space-Saving algorithm to calculate the top_num frequent items in expr, and the result is the +frequent items and their occurrence times, which is an approximation + +The space_expand_rate parameter is optional and is used to set the number of counters used in the Space-Saving algorithm +``` +counter numbers = top_num * space_expand_rate +```` +The higher value of space_expand_rate, the more accurate result will be. The default value is 50 + +## example +``` +MySQL [test]> select topn(keyword,10) from keyword_table where date>= '2020-06-01' and date <= '2020-06-19' ; ++------------------------------------------------------------------------------------------------------------+ +| topn(`keyword`, 10) | ++------------------------------------------------------------------------------------------------------------+ +| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117 | ++------------------------------------------------------------------------------------------------------------+ + +MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>= '2020-06-17' and date <= '2020-06-19' group by date; ++------------+-----------------------------------------------------------------------------------------------+ +| date | topn(`keyword`, 10, 100) | ++------------+-----------------------------------------------------------------------------------------------+ +| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7 | +| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6 | +| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7 | ++------------+-----------------------------------------------------------------------------------------------+ +``` +## keyword +TOPN \ No newline at end of file diff --git a/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md new file mode 100644 index 00000000000000..db6ae23389523e --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md @@ -0,0 +1,60 @@ +--- +{ + "title": "TOPN", + "language": "zh-CN" +} +--- + + + +# TOPN +## description +### Syntax + +`topn(expr, INT top_num[, INT space_expand_rate])` + +该topn函数使用Space-Saving算法计算expr中的top_num个频繁项,结果为频繁项及其出现次数,该结果为近似值 + +space_expand_rate参数是可选项,该值用来设置Space-Saving算法中使用的counter个数 +``` +counter numbers = top_num * space_expand_rate +```` +space_expand_rate的值越大,结果越准确,默认值为50 + +## example +``` +MySQL [test]> select topn(keyword,10) from keyword_table where date>= '2020-06-01' and date <= '2020-06-19' ; ++------------------------------------------------------------------------------------------------------------+ +| topn(`keyword`, 10) | ++------------------------------------------------------------------------------------------------------------+ +| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117 | ++------------------------------------------------------------------------------------------------------------+ + +MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>= '2020-06-17' and date <= '2020-06-19' group by date; ++------------+-----------------------------------------------------------------------------------------------+ +| date | topn(`keyword`, 10, 100) | ++------------+-----------------------------------------------------------------------------------------------+ +| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7 | +| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6 | +| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7 | ++------------+-----------------------------------------------------------------------------------------------+ +``` +## keyword +TOPN \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 0b0ef1bcc4ee67..9f41e695b50c38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.ScalarFunction; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -455,6 +456,30 @@ private void analyzeBuiltinAggFunction(Analyzer analyzer) throws AnalysisExcepti } } } + + if (fnName.getFunction().equalsIgnoreCase("topn")) { + if (children.size() != 2 && children.size() != 3) { + throw new AnalysisException("topn(expr, INT [, B]) requires two or three parameters"); + } + if (!getChild(1).isConstant() || !getChild(1).getType().isIntegerType()) { + throw new AnalysisException("topn requires second parameter must be a constant Integer Type: " + + this.toSql()); + } + if (getChild(1).getType() != ScalarType.INT) { + Expr e = getChild(1).castTo(ScalarType.INT); + setChild(1, e); + } + if (children.size() == 3) { + if (!getChild(2).isConstant() || !getChild(2).getType().isIntegerType()) { + throw new AnalysisException("topn requires the third parameter must be a constant Integer Type: " + + this.toSql()); + } + if (getChild(2).getType() != ScalarType.INT) { + Expr e = getChild(2).castTo(ScalarType.INT); + setChild(2, e); + } + } + } } // Provide better error message for some aggregate builtins. These can be diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index 579e19c3f786c5..10ecbdf6d8de8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -817,6 +817,70 @@ public boolean isNonNullResultWithNullParamFunctions(String funcName) { "_ZN5doris15BitmapFunctions25bitmap_intersect_finalizeINS_11StringValueEEEN9doris_udf9BigIntValEPNS3_15FunctionContextERKNS3_9StringValE") .build(); + private static final Map TOPN_UPDATE_SYMBOL = + ImmutableMap.builder() + .put(Type.BOOLEAN, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.TINYINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.SMALLINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.INT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE") + .put(Type.BIGINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.FLOAT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.DOUBLE, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.CHAR, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_") + .put(Type.VARCHAR, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_") + .put(Type.DATE, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.DATETIME, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.DECIMAL, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.DECIMALV2, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .put(Type.LARGEINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE") + .build(); + + private static final Map TOPN_UPDATE_MORE_PARAM_SYMBOL = + ImmutableMap.builder() + .put(Type.BOOLEAN, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.TINYINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.SMALLINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.INT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_SA_PNS2_9StringValE") + .put(Type.BIGINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.FLOAT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.DOUBLE, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.CHAR, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_") + .put(Type.VARCHAR, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_") + .put(Type.DATE, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.DATETIME, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.DECIMAL, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.DECIMALV2, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .put(Type.LARGEINT, + "_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE") + .build(); + public Function getFunction(Function desc, Function.CompareMode mode) { List fns = functions.get(desc.functionName()); if (fns == null) { @@ -1185,6 +1249,26 @@ private void initAggregateBuiltins() { "_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", true, false, true)); + // TopN + if (TOPN_UPDATE_SYMBOL.containsKey(t)) { + addBuiltin(AggregateFunction.createBuiltin("topn", + Lists.newArrayList(t, Type.INT), Type.VARCHAR, Type.VARCHAR, + "_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + TOPN_UPDATE_SYMBOL.get(t), + "_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + true, false, true)); + addBuiltin(AggregateFunction.createBuiltin("topn", + Lists.newArrayList(t, Type.INT, Type.INT), Type.VARCHAR, Type.VARCHAR, + "_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + TOPN_UPDATE_MORE_PARAM_SYMBOL.get(t), + "_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + true, false, true)); + } + if (STDDEV_UPDATE_SYMBOL.containsKey(t)) { addBuiltin(AggregateFunction.createBuiltin("stddev", Lists.newArrayList(t), STDDEV_RETTYPE_SYMBOL.get(t), Type.VARCHAR, diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto index 846b25ad50f6cf..eceb05139ccbb4 100644 --- a/gensrc/proto/olap_common.proto +++ b/gensrc/proto/olap_common.proto @@ -48,3 +48,14 @@ enum CompressKind { COMPRESS_LZ4 = 2; } +message PCounter { + required string item = 1; + required uint64 count = 2; +} + +message PTopNCounter { + required uint32 top_num = 1; + required uint32 space_expand_rate = 2; + repeated PCounter counter = 3; +} + From 55c79cc4764009d1bf37822038a00091c64b48af Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Tue, 27 Oct 2020 15:26:28 +0800 Subject: [PATCH 2/5] add doc --- .../sql-reference/sql-functions/aggregate-functions/topn.md | 4 ++-- .../sql-reference/sql-functions/aggregate-functions/topn.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md index bb29738b7db54c..43967286f6d26e 100644 --- a/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md +++ b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md @@ -1,7 +1,7 @@ --- { "title": "TOPN", - "language": "zh-CN" + "language": "en" } --- @@ -36,7 +36,7 @@ frequent items and their occurrence times, which is an approximation The space_expand_rate parameter is optional and is used to set the number of counters used in the Space-Saving algorithm ``` counter numbers = top_num * space_expand_rate -```` +``` The higher value of space_expand_rate, the more accurate result will be. The default value is 50 ## example diff --git a/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md index db6ae23389523e..11c45440765fdb 100644 --- a/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md +++ b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md @@ -35,7 +35,7 @@ under the License. space_expand_rate参数是可选项,该值用来设置Space-Saving算法中使用的counter个数 ``` counter numbers = top_num * space_expand_rate -```` +``` space_expand_rate的值越大,结果越准确,默认值为50 ## example From 1a5ec0617c896f9522c8438e30bd81a260e50375 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Tue, 15 Dec 2020 10:48:05 +0800 Subject: [PATCH 3/5] Update gensrc/proto/olap_common.proto Co-authored-by: Mingyu Chen --- gensrc/proto/olap_common.proto | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto index eceb05139ccbb4..7326b884e7a9f3 100644 --- a/gensrc/proto/olap_common.proto +++ b/gensrc/proto/olap_common.proto @@ -50,7 +50,7 @@ enum CompressKind { message PCounter { required string item = 1; - required uint64 count = 2; + required uint64 count = 2; } message PTopNCounter { @@ -58,4 +58,3 @@ message PTopNCounter { required uint32 space_expand_rate = 2; repeated PCounter counter = 3; } - From 1f187d5ac564094007bb51f4176fe0344c969801 Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Tue, 15 Dec 2020 14:21:16 +0800 Subject: [PATCH 4/5] json result --- be/src/util/topn_counter.cpp | 20 ++++++++++++-------- be/test/exprs/topn_function_test.cpp | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/be/src/util/topn_counter.cpp b/be/src/util/topn_counter.cpp index 77941f453ea960..b61246412211d8 100644 --- a/be/src/util/topn_counter.cpp +++ b/be/src/util/topn_counter.cpp @@ -16,6 +16,9 @@ // under the License. #include +#include +#include + #include "gen_cpp/olap_common.pb.h" #include "topn_counter.h" #include "slice.h" @@ -99,7 +102,7 @@ void TopNCounter::merge(doris::TopNCounter &&other) { uint64_t m1 = this_full ? _counter_vec->back().get_count() : 0; uint64_t m2 = another_full ? other._counter_vec->back().get_count() : 0; - + if (another_full == true) { for (auto &entry : *(this->_counter_map)) { entry.second.add_count(m2); @@ -123,16 +126,17 @@ void TopNCounter::finalize(std::string& finalize_str) { if (!_ordered) { sort_retain(_top_num); } - std::ostringstream oss; + // use json format print + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter writer(buffer); uint32_t k = 0; + writer.StartObject(); for (std::vector::const_iterator it = _counter_vec->begin(); it != _counter_vec->end() && k < _top_num; ++it, ++k) { - oss << it->get_item() << ":" << it->get_count() << ", "; - } - finalize_str = oss.str(); - // remove last ', ' char - if (finalize_str.size() > 0) { - finalize_str.erase(finalize_str.size()-2); + writer.Key(it->get_item().data()); + writer.Uint64(it->get_count()); } + writer.EndObject(); + finalize_str = buffer.GetString(); } } \ No newline at end of file diff --git a/be/test/exprs/topn_function_test.cpp b/be/test/exprs/topn_function_test.cpp index 89de53701e7adb..a14ec70d70bc04 100644 --- a/be/test/exprs/topn_function_test.cpp +++ b/be/test/exprs/topn_function_test.cpp @@ -238,7 +238,7 @@ TEST_F(TopNFunctionsTest, test_date_type) { DateTimeValue dt(20201001000000); doris_udf::DateTimeVal dt_val; dt.to_datetime_val(&dt_val); -for (uint32_t i = 0; i < 10; ++i) { + for (uint32_t i = 0; i < 10; ++i) { TopNFunctions::topn_update(ctx, dt_val, 1, &dst1); } StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1); From 0ab589d5ddc85981145e6cf516f91ac64099354a Mon Sep 17 00:00:00 2001 From: Youngwb Date: Tue, 15 Dec 2020 15:17:01 +0800 Subject: [PATCH 5/5] fix test --- be/src/util/topn_counter.cpp | 6 +++--- be/test/exprs/topn_function_test.cpp | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/util/topn_counter.cpp b/be/src/util/topn_counter.cpp index b61246412211d8..bb6052fe781f04 100644 --- a/be/src/util/topn_counter.cpp +++ b/be/src/util/topn_counter.cpp @@ -16,7 +16,7 @@ // under the License. #include -#include +#include #include #include "gen_cpp/olap_common.pb.h" @@ -128,7 +128,7 @@ void TopNCounter::finalize(std::string& finalize_str) { } // use json format print rapidjson::StringBuffer buffer; - rapidjson::PrettyWriter writer(buffer); + rapidjson::Writer writer(buffer); uint32_t k = 0; writer.StartObject(); for (std::vector::const_iterator it = _counter_vec->begin(); it != _counter_vec->end() && k < _top_num; ++it, ++k) { @@ -139,4 +139,4 @@ void TopNCounter::finalize(std::string& finalize_str) { finalize_str = buffer.GetString(); } -} \ No newline at end of file +} diff --git a/be/test/exprs/topn_function_test.cpp b/be/test/exprs/topn_function_test.cpp index a14ec70d70bc04..f13c2e2a8b8d5f 100644 --- a/be/test/exprs/topn_function_test.cpp +++ b/be/test/exprs/topn_function_test.cpp @@ -178,7 +178,7 @@ TEST_F(TopNFunctionsTest, topn_update) { TopNFunctions::topn_update(ctx, src3, 2, &dst); StringVal result = TopNFunctions::topn_finalize(ctx, dst); - StringVal expected("a:10, b:2"); + StringVal expected("{\"a\":10,\"b\":2}"); ASSERT_EQ(expected, result); } @@ -210,7 +210,7 @@ TEST_F(TopNFunctionsTest, topn_merge) { TopNFunctions::topn_merge(ctx, val1, &dst); TopNFunctions::topn_merge(ctx, val2, &dst); StringVal result = TopNFunctions::topn_finalize(ctx, dst); - StringVal expected("a:20, b:8"); + StringVal expected("{\"a\":20,\"b\":8}"); ASSERT_EQ(expected, result); } @@ -227,7 +227,7 @@ TEST_F(TopNFunctionsTest, test_null_value) { TopNFunctions::topn_init(ctx, &dst2); TopNFunctions::topn_merge(ctx, serialized, &dst2); StringVal result = TopNFunctions::topn_finalize(ctx, dst2); - StringVal expected(""); + StringVal expected("{}"); ASSERT_EQ(expected, result); } @@ -247,7 +247,7 @@ TEST_F(TopNFunctionsTest, test_date_type) { TopNFunctions::topn_init(ctx, &dst2); TopNFunctions::topn_merge(ctx, serialized, &dst2); StringVal result = TopNFunctions::topn_finalize(ctx, dst2); - StringVal expected("2020-10-01 00:00:00:10"); + StringVal expected("{\"2020-10-01 00:00:00\":10}"); ASSERT_EQ(expected, result); } @@ -256,4 +256,4 @@ TEST_F(TopNFunctionsTest, test_date_type) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +}