From 09d8a83d54a2978553bf58fdc4142d2ed076485e Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Thu, 5 Sep 2019 15:31:43 +0800 Subject: [PATCH 1/2] Encapsulate HLL logic --- be/src/common/daemon.cpp | 2 + be/src/exec/csv_scan_node.cpp | 4 +- be/src/exprs/CMakeLists.txt | 8 +- be/src/exprs/aggregate_functions.cpp | 28 +- be/src/exprs/aggregate_functions.h | 47 +-- be/src/exprs/hll_function.cpp | 109 +++++ be/src/exprs/hll_function.h | 44 +++ be/src/exprs/hll_hash_function.cpp | 59 +-- be/src/exprs/hll_hash_function.h | 19 +- be/src/olap/aggregate_func.h | 73 ++-- be/src/olap/hll.cpp | 122 ------ be/src/olap/hll.h | 373 ++++++++++++++++-- be/src/olap/memtable.cpp | 17 +- be/src/udf/udf.h | 2 + be/test/exprs/CMakeLists.txt | 1 + be/test/exprs/hll_function_test.cpp | 116 ++++++ .../org/apache/doris/catalog/FunctionSet.java | 38 +- gensrc/script/doris_builtins_functions.py | 6 +- run-ut.sh | 1 + 19 files changed, 697 insertions(+), 372 deletions(-) create mode 100644 be/src/exprs/hll_function.cpp create mode 100644 be/src/exprs/hll_function.h create mode 100644 be/test/exprs/hll_function_test.cpp diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index a8493b12076dfc..c68794631f0cc2 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -52,6 +52,7 @@ #include "exprs/hll_hash_function.h" #include "exprs/timezone_db.h" #include "exprs/bitmap_function.h" +#include "exprs/hll_function.h" #include "geo/geo_functions.h" #include "olap/options.h" #include "util/time.h" @@ -272,6 +273,7 @@ void init_daemon(int argc, char** argv, const std::vector& paths) { GeoFunctions::init(); TimezoneDatabase::init(); BitmapFunctions::init(); + HllFunctions::init(); pthread_t tc_malloc_pid; pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL); diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 0b287c341d16fa..fbcd3ae50fd9eb 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -672,14 +672,14 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) { std::string str(src, len); if (str != "\\N") { uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED); - char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE]; + char buf[10]; // expliclit set buf[0] = HLL_DATA_EXPLICIT; buf[1] = 1; *((uint64_t*)(buf + 2)) = hash; *result = std::string(buf, sizeof(buf)); } else { - char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE]; + char buf[1]; // empty set buf[0] = HLL_DATA_EMPTY; *result = std::string(buf, sizeof(buf)); diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 9bbdd022735e08..6ff2d11e5de40d 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -65,10 +65,4 @@ add_library(Exprs agg_fn.cc new_agg_fn_evaluator.cc bitmap_function.cpp -) -#ADD_BE_TEST(json_function_test) -#ADD_BE_TEST(binary_predicate_test) -#ADD_BE_TEST(in_predicate_test) -#ADD_BE_TEST(expr-test) -#ADD_BE_TEST(hybird_set_test) -#ADD_BE_TEST(in-predicate-test) + hll_function.cpp) diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp index 825dd207d7b141..0c363d6c59fc71 100644 --- a/be/src/exprs/aggregate_functions.cpp +++ b/be/src/exprs/aggregate_functions.cpp @@ -1147,14 +1147,11 @@ void AggregateFunctions::hll_update(FunctionContext* ctx, const T& src, StringVa } DCHECK(!dst->is_null); - DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION)); + DCHECK_EQ(dst->len, HLL_REGISTERS_COUNT); uint64_t hash_value = AnyValUtil::hash64_murmur(src, HashUtil::MURMUR_SEED); if (hash_value != 0) { - // Use the lower bits to index into the number of streams and then - // find the first 1 bit after the index bits. int idx = hash_value % dst->len; - // uint8_t first_one_bit = __buiHLL_LENltin_ctzl(hash_value >> HLL_PRECISION) + 1; uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; dst->ptr[idx] = std::max(dst->ptr[idx], first_one_bit); } @@ -1193,7 +1190,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx, return; } DCHECK(!dst->is_null); - + dst->agg_parse_and_cal(src); return ; } @@ -1203,7 +1200,7 @@ void AggregateFunctions::hll_union_agg_merge(FunctionContext* ctx, const HllVal& DCHECK(!src.is_null); DCHECK_EQ(dst->len, HLL_COLUMN_DEFAULT_LEN); DCHECK_EQ(src.len, HLL_COLUMN_DEFAULT_LEN); - + dst->agg_merge(src); } @@ -1214,25 +1211,6 @@ doris_udf::BigIntVal AggregateFunctions::hll_union_agg_finalize(doris_udf::Funct return result; } -void AggregateFunctions::hll_union_agg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* slot) { - hll_union_agg_init(ctx, static_cast (slot)); -} - -void AggregateFunctions::hll_union_agg_update(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src, - doris_udf::StringVal* dst) { - hll_union_agg_update(ctx, static_cast (src), static_cast (dst)); -} - -void AggregateFunctions::hll_union_agg_merge(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src, - doris_udf::StringVal* dst) { - hll_union_agg_merge(ctx, static_cast (src), static_cast (dst)); -} - -doris_udf::StringVal AggregateFunctions::hll_union_agg_finalize(doris_udf::FunctionContext* ctx, const StringVal& src) { - BigIntVal intVal = hll_union_agg_finalize(ctx, static_cast (src)); - return AnyValUtil::from_string_temp(ctx, std::to_string(intVal.val));; -} - int64_t AggregateFunctions::hll_algorithm(uint8_t *pdata, int data_len) { DCHECK_EQ(data_len, HLL_REGISTERS_COUNT); diff --git a/be/src/exprs/aggregate_functions.h b/be/src/exprs/aggregate_functions.h index 76c1775e5532b8..23b43724a39702 100644 --- a/be/src/exprs/aggregate_functions.h +++ b/be/src/exprs/aggregate_functions.h @@ -125,14 +125,6 @@ dst); const doris_udf::DecimalV2Val& src, doris_udf::StringVal* dst); - // static void decimal_avg_add_or_remove(doris_udf::FunctionContext* ctx, - // const doris_udf::DecimalVal& src, - // doris_udf::StringVal* dst, bool remove); - // static void decimal_avg_add_or_remove(doris_udf::FunctionContext* ctx, - // const doris_udf::DecimalVal& src, - // doris_udf::StringVal* dst) { - // return decimal_avg_add_or_remove(ctx, src, dst, false); - // } static doris_udf::DecimalVal decimal_avg_get_value(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& val); static doris_udf::DecimalV2Val decimalv2_avg_get_value(doris_udf::FunctionContext* ctx, @@ -194,22 +186,6 @@ dst); doris_udf::FunctionContext*, const doris_udf::StringVal& src); - // Hyperloglog distinct estimate algorithm. - // See these papers for more details. - // 1) Hyperloglog: The analysis of a near-optimal cardinality estimation - // algorithm (2007) - // 2) HyperLogLog in Practice (paper from google with some improvements) - static void hll_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot); - template - static void hll_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst); - static void hll_merge( - doris_udf::FunctionContext*, - const doris_udf::StringVal& src, - doris_udf::StringVal* dst); - static doris_udf::StringVal hll_finalize( - doris_udf::FunctionContext*, - const doris_udf::StringVal& src); - // count and sum distinct algorithm in multi distinct template static void count_or_sum_distinct_numeric_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst); @@ -335,8 +311,19 @@ dst); static void offset_fn_update(doris_udf::FunctionContext*, const T& src, const doris_udf::BigIntVal&, const T&, T* dst); - // HLL value type calculate - // init sets buffer + // todo(kks): keep following HLL methods only for backward compatibility, we should remove these methods + // when doris 0.12 release + static void hll_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot); + template + static void hll_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst); + static void hll_merge( + doris_udf::FunctionContext*, + const doris_udf::StringVal& src, + doris_udf::StringVal* dst); + static doris_udf::StringVal hll_finalize( + doris_udf::FunctionContext*, + const doris_udf::StringVal& src); + static void hll_union_agg_init(doris_udf::FunctionContext*, doris_udf::HllVal* slot); // fill all register accroading to hll set type static void hll_union_agg_update(doris_udf::FunctionContext*, const doris_udf::HllVal& src, @@ -351,14 +338,6 @@ dst); doris_udf::FunctionContext*, const doris_udf::HllVal& src); - //for backward compatibility, we could remove the following four HLL methods after doris 0.11 version - static void hll_union_agg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* slot); - static void hll_union_agg_update(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src, - doris_udf::StringVal* dst); - static void hll_union_agg_merge(doris_udf::FunctionContext* ctx,const doris_udf::StringVal& src, - doris_udf::StringVal* dst); - static doris_udf::StringVal hll_union_agg_finalize(doris_udf::FunctionContext* ctx, const StringVal& src); - // calculate result static int64_t hll_algorithm(uint8_t *pdata, int data_len); static int64_t hll_algorithm(const StringVal &dst) { diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp new file mode 100644 index 00000000000000..1769a211f6a6cb --- /dev/null +++ b/be/src/exprs/hll_function.cpp @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/hll_function.h" + +#include "exprs/anyval_util.h" +#include "util/hash_util.hpp" + +namespace doris { + +using doris_udf::BigIntVal; +using doris_udf::StringVal; + +void HllFunctions::init() { +} + +StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { + const int HLL_SINGLE_VALUE_SIZE = 10; + const int HLL_EMPTY_SIZE = 1; + std::string buf; + std::unique_ptr hll {new HyperLogLog()}; + if (!input.is_null) { + uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); + hll.reset(new HyperLogLog(hash_value)); + buf.resize(HLL_SINGLE_VALUE_SIZE); + } else { + buf.resize(HLL_EMPTY_SIZE); + } + hll->serialize((char*)buf.c_str()); + return AnyValUtil::from_string_temp(ctx, buf); +} + +void HllFunctions::hll_init(FunctionContext *, StringVal* dst) { + dst->is_null = false; + dst->len = sizeof(HyperLogLog); + dst->ptr = (uint8_t*)new HyperLogLog(); +} + +template +void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) { + if (src.is_null) { + return; + } + + uint64_t hash_value = AnyValUtil::hash64_murmur(src, HashUtil::MURMUR_SEED); + if (hash_value != 0) { + auto* dst_hll = reinterpret_cast(dst->ptr); + dst_hll->update(hash_value); + } +} +void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) { + HyperLogLog src_hll = HyperLogLog((char*)src.ptr); + auto* dst_hll = reinterpret_cast(dst->ptr); + dst_hll->merge(src_hll); +} + +BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) { + auto* src_hll = reinterpret_cast(src.ptr); + BigIntVal result(src_hll->estimate_cardinality()); + delete src_hll; + return result; +} + +BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& input) { + if (input.is_null) { + return BigIntVal::null(); + } + StringVal dst; + hll_init(ctx, &dst); + hll_merge(ctx, input, &dst); + return hll_finalize(ctx, dst); +} + +StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) { + auto* src_hll = reinterpret_cast(src.ptr); + StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN); + int size = src_hll->serialize((char*)result.ptr); + result.resize(ctx, size); + delete src_hll; + return result; +} + +template void HllFunctions::hll_update(FunctionContext*, const BooleanVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const TinyIntVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const SmallIntVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const IntVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const BigIntVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const FloatVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const DoubleVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const StringVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const DateTimeVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const LargeIntVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const DecimalVal&, StringVal*); +template void HllFunctions::hll_update(FunctionContext*, const DecimalV2Val&, StringVal*); +} diff --git a/be/src/exprs/hll_function.h b/be/src/exprs/hll_function.h new file mode 100644 index 00000000000000..e08cbff7bea716 --- /dev/null +++ b/be/src/exprs/hll_function.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_QUERY_EXPRS_HLL_FUNCTION_H +#define DORIS_BE_SRC_QUERY_EXPRS_HLL_FUNCTION_H + +#include "udf/udf.h" + +namespace doris { + +class HllFunctions { +public: + static void init(); + static StringVal hll_hash(FunctionContext* ctx, const StringVal& dest_base); + static void hll_init(FunctionContext*, StringVal* dst); + + template + static void hll_update(FunctionContext*, const T& src, StringVal* dst); + + static void hll_merge(FunctionContext*,const StringVal& src, StringVal* dst); + + static BigIntVal hll_finalize(FunctionContext*, const StringVal& src); + + static StringVal hll_serialize(FunctionContext* ctx, const StringVal& src); + + static BigIntVal hll_cardinality(FunctionContext* ctx, const StringVal& src); +}; +} + +#endif diff --git a/be/src/exprs/hll_hash_function.cpp b/be/src/exprs/hll_hash_function.cpp index 3502036c33d7d5..1534f27f91b047 100644 --- a/be/src/exprs/hll_hash_function.cpp +++ b/be/src/exprs/hll_hash_function.cpp @@ -15,62 +15,34 @@ // specific language governing permissions and limitations // under the License. -#include "exprs/hll_hash_function.h" - -#include "exprs/expr.h" -#include "runtime/tuple_row.h" -#include "runtime/datetime_value.h" -#include "util/path_builder.h" -#include "runtime/string_value.hpp" #include "exprs/aggregate_functions.h" -#include "exprs/cast_functions.h" -#include "olap/olap_common.h" -#include "olap/utils.h" +#include "exprs/hll_hash_function.h" namespace doris { using doris_udf::BigIntVal; using doris_udf::StringVal; -const int HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE = 10; -const int HllHashFunctions::HLL_EMPTY_SET_SIZE = 1; - void HllHashFunctions::init() { } -StringVal HllHashFunctions::create_string_result(doris_udf::FunctionContext* ctx, - const StringVal& val, const bool is_null) { - StringVal result; - if (is_null) { - // HLL_DATA_EMPTY - char buf[HLL_EMPTY_SET_SIZE]; - buf[0] = HLL_DATA_EMPTY; - result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf)); +StringVal HllHashFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { + const int HLL_SINGLE_VALUE_SIZE = 10; + const int HLL_EMPTY_SIZE = 1; + std::string buf; + std::unique_ptr hll {new HyperLogLog()}; + if (!input.is_null) { + uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); + hll.reset(new HyperLogLog(hash_value)); + buf.resize(HLL_SINGLE_VALUE_SIZE); } else { - // HLL_DATA_EXPLHLL_DATA_EXPLICIT - uint64_t hash = HashUtil::murmur_hash64A(val.ptr, val.len, HashUtil::MURMUR_SEED); - char buf[HLL_INIT_EXPLICT_SET_SIZE]; - buf[0] = HLL_DATA_EXPLICIT; - buf[1] = 1; - *((uint64_t*)(buf + 2)) = hash; - result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf)); - } - return result; -} - -StringVal HllHashFunctions::hll_hash(doris_udf::FunctionContext* ctx, - const StringVal& input) { - return create_string_result(ctx, input, input.is_null); + buf.resize(HLL_EMPTY_SIZE); + } + hll->serialize((char*)buf.c_str()); + return AnyValUtil::from_string_temp(ctx, buf); } -StringVal HllHashFunctions::hll_cardinality(doris_udf::FunctionContext* ctx, - const doris_udf::StringVal& dest_base) { - BigIntVal intVal = hll_cardinality(ctx, static_cast (dest_base)); - return AnyValUtil::from_string_temp(ctx, std::to_string(intVal.val)); -} - -BigIntVal HllHashFunctions::hll_cardinality(doris_udf::FunctionContext* ctx, - const HllVal& input) { +BigIntVal HllHashFunctions::hll_cardinality(FunctionContext* ctx, const HllVal& input) { if (input.is_null) { return BigIntVal::null(); } @@ -79,4 +51,5 @@ BigIntVal HllHashFunctions::hll_cardinality(doris_udf::FunctionContext* ctx, AggregateFunctions::hll_union_agg_update(ctx, input, &dst); return AggregateFunctions::hll_union_agg_finalize(ctx, dst); } + } diff --git a/be/src/exprs/hll_hash_function.h b/be/src/exprs/hll_hash_function.h index a6d87985187b7c..af47dc0c216c2c 100644 --- a/be/src/exprs/hll_hash_function.h +++ b/be/src/exprs/hll_hash_function.h @@ -18,33 +18,22 @@ #ifndef DORIS_BE_SRC_QUERY_EXPRS_HLL_HASH_FUNCTION_H #define DORIS_BE_SRC_QUERY_EXPRS_HLL_HASH_FUNCTION_H -#include "runtime/primitive_type.h" #include "udf/udf.h" #include "util/hash_util.hpp" #include "exprs/anyval_util.h" -#include "runtime/datetime_value.h" namespace doris { class Expr; -class OpcodeRegistry; class TupleRow; +// todo(kks): for backward compatibility, we should remove this class +// when doris 0.12 release class HllHashFunctions { public: static void init(); - static StringVal hll_hash(doris_udf::FunctionContext* ctx, - const doris_udf::StringVal& dest_base); - static BigIntVal hll_cardinality(doris_udf::FunctionContext* ctx, - const doris_udf::HllVal& dest_base); - //for backward compatibility, we could remove this method after doris 0.11 version - static StringVal hll_cardinality(doris_udf::FunctionContext* ctx, - const doris_udf::StringVal& dest_base); - static StringVal create_string_result(doris_udf::FunctionContext* ctx, - const StringVal& str, const bool is_null); - - static const int HLL_INIT_EXPLICT_SET_SIZE; - static const int HLL_EMPTY_SET_SIZE; + static StringVal hll_hash(FunctionContext* ctx, const StringVal& dest_base); + static BigIntVal hll_cardinality(FunctionContext* ctx, const HllVal& dest_base); }; } diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 5da16e412a4da4..64c13b37525a56 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -399,64 +399,43 @@ struct AggregateFuncTraits template <> struct AggregateFuncTraits { static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { - // TODO(zc): refactor HLL implementation DCHECK_EQ(src_null, false); dst->set_not_null(); - auto* dest_slice = reinterpret_cast(dst->mutable_cell_ptr()); - char* mem = arena->Allocate(sizeof(HllContext)); - auto* context = new (mem) HllContext; - HllSetHelper::init_context(context); - HllSetHelper::fill_set(src, context); - context->has_value = true; - char* variable_ptr = arena->Allocate(sizeof(HllContext*) + HLL_COLUMN_DEFAULT_LEN); - *(size_t*)(variable_ptr) = (size_t)(context); - variable_ptr += sizeof(HllContext*); - dest_slice->data = variable_ptr; - dest_slice->size = HLL_COLUMN_DEFAULT_LEN; + + auto* src_slice = reinterpret_cast(src); + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + + dst_slice->size = sizeof(HyperLogLog); + dst_slice->data = (char*)new HyperLogLog(src_slice->data);; } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { DCHECK_EQ(src.is_null(), false); - auto l_slice = reinterpret_cast(dst->mutable_cell_ptr()); - auto context = *reinterpret_cast(l_slice->data - sizeof(HllContext*)); - HllSetHelper::fill_set((const char*)src.cell_ptr(), context); + + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + auto* src_slice = reinterpret_cast(src.cell_ptr()); + auto* dst_hll = reinterpret_cast(dst_slice->data); + + // fixme(kks): trick here, need improve + if (arena == nullptr) { // for query + HyperLogLog src_hll = HyperLogLog(src_slice->data); + dst_hll->merge(src_hll); + } else { // for stream load + auto* src_hll = reinterpret_cast(src_slice->data); + dst_hll->merge(*src_hll); + + delete src_hll; + } } static void finalize(RowCursorCell* src, Arena* arena) { - auto slice = reinterpret_cast(src->mutable_cell_ptr()); - auto context = *reinterpret_cast(slice->data - sizeof(HllContext*)); - std::map index_to_value; - if (context->has_sparse_or_full || - context->hash64_set->size() > HLL_EXPLICLIT_INT64_NUM) { - HllSetHelper::set_max_register(context->registers, HLL_REGISTERS_COUNT, - *(context->hash64_set)); - for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { - if (context->registers[i] != 0) { - index_to_value[i] = context->registers[i]; - } - } - } - int sparse_set_len = index_to_value.size() * - (sizeof(HllSetResolver::SparseIndexType) - + sizeof(HllSetResolver::SparseValueType)) - + sizeof(HllSetResolver::SparseLengthValueType); - int result_len = 0; - - if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { - // full set - HllSetHelper::set_full(slice->data, context->registers, - HLL_REGISTERS_COUNT, result_len); - } else if (index_to_value.size() > 0) { - // sparse set - HllSetHelper::set_sparse(slice->data, index_to_value, result_len); - } else if (context->hash64_set->size() > 0) { - // expliclit set - HllSetHelper::set_explicit(slice->data, *(context->hash64_set), result_len); - } + auto *slice = reinterpret_cast(src->mutable_cell_ptr()); + auto *hll = reinterpret_cast(slice->data); - slice->size = result_len & 0xffff; + slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN); + slice->size = hll->serialize(slice->data); - delete context->hash64_set; + delete hll; } }; // when data load, after bitmap_init fucntion, bitmap_union column won't be null diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index ce85991e076bc5..3e3f6861b46bb0 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -19,10 +19,6 @@ #include #include -#include -#include - -#include "util/slice.h" using std::map; using std::nothrow; @@ -70,81 +66,6 @@ void HllSetResolver::parse() { } } -void HllSetResolver::fill_registers(char* registers, int len) { - if (_set_type == HLL_DATA_EXPLICIT) { - for (int i = 0; i < get_explicit_count(); ++i) { - uint64_t hash_value = get_explicit_value(i); - int idx = hash_value % len; - uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - registers[idx] = std::max((uint8_t)registers[idx], first_one_bit); - } - } else if (_set_type == HLL_DATA_SPRASE) { - std::map& sparse_map = get_sparse_map(); - for (std::map::iterator iter = sparse_map.begin(); - iter != sparse_map.end(); iter++) { - registers[iter->first] = - std::max((uint8_t)registers[iter->first], (uint8_t)iter->second); - } - } else if (_set_type == HLL_DATA_FULL) { - char* full_value = get_full_value(); - for (int i = 0; i < len; i++) { - registers[i] = std::max((uint8_t)registers[i], (uint8_t)full_value[i]); - } - - } else { - // HLL_DATA_EMPTY - } -} - -void HllSetResolver::fill_index_to_value_map(std::map* index_to_value, int len) { - if (_set_type == HLL_DATA_EXPLICIT) { - for (int i = 0; i < get_explicit_count(); ++i) { - uint64_t hash_value = get_explicit_value(i); - int idx = hash_value % len; - uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - if (index_to_value->find(idx) != index_to_value->end()) { - (*index_to_value)[idx] = - (*index_to_value)[idx] < first_one_bit ? first_one_bit : (*index_to_value)[idx]; - } else { - (*index_to_value)[idx] = first_one_bit; - } - } - } else if (_set_type == HLL_DATA_SPRASE) { - std::map& sparse_map = get_sparse_map(); - for (std::map::iterator iter = sparse_map.begin(); - iter != sparse_map.end(); iter++) { - if (index_to_value->find(iter->first) != index_to_value->end()) { - (*index_to_value)[iter->first] = - (*index_to_value)[iter->first] - < iter->second ? iter->second : (*index_to_value)[iter->first]; - } else { - (*index_to_value)[iter->first] = iter->second; - } - } - } else if (_set_type == HLL_DATA_FULL) { - char* registers = get_full_value(); - for (int i = 0; i < len; i++) { - if (registers[i] != 0) { - if (index_to_value->find(i) != index_to_value->end()) { - (*index_to_value)[i] = - (*index_to_value)[i] < registers[i] ? registers[i] : (*index_to_value)[i]; - } else { - (*index_to_value)[i] = registers[i]; - } - } - } - } -} - -void HllSetResolver::fill_hash64_set(std::set* hash_set) { - if (_set_type == HLL_DATA_EXPLICIT) { - for (int i = 0; i < get_explicit_count(); ++i) { - uint64_t hash_value = get_explicit_value(i); - hash_set->insert(hash_value); - } - } -} - void HllSetHelper::set_sparse( char *result, const std::map& index_to_value, int& len) { result[0] = HLL_DATA_SPRASE; @@ -178,13 +99,6 @@ void HllSetHelper::set_explicit(char* result, const std::set& hash_val len += sizeof(uint64_t) * hash_value_set.size(); } -void HllSetHelper::set_full(char* result, const char* registers, - const int registers_len, int& len) { - result[0] = HLL_DATA_FULL; - memcpy(result + 1, registers, registers_len); - len = registers_len + sizeof(HllSetResolver::SetTypeValueType); -} - void HllSetHelper::set_full(char* result, const std::map& index_to_value, const int registers_len, int& len) { @@ -196,40 +110,4 @@ void HllSetHelper::set_full(char* result, len = registers_len + sizeof(HllSetResolver::SetTypeValueType); } -void HllSetHelper::set_max_register(char* registers, int registers_len, - const std::set& hash_set) { - for (std::set::const_iterator iter = hash_set.begin(); - iter != hash_set.end(); iter++) { - uint64_t hash_value = *iter; - int idx = hash_value % registers_len; - uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; - registers[idx] = std::max((uint8_t)registers[idx], first_one_bit); - } -} - -void HllSetHelper::fill_set(const char* data, HllContext* context) { - HllSetResolver resolver; - const Slice* slice = reinterpret_cast(data); - if (OLAP_UNLIKELY(slice->data == nullptr)) { - return; - } - resolver.init(slice->data, slice->size); - resolver.parse(); - if (resolver.get_hll_data_type() == HLL_DATA_EXPLICIT) { - // expliclit set - resolver.fill_hash64_set(context->hash64_set); - } else if (resolver.get_hll_data_type() != HLL_DATA_EMPTY) { - // full or sparse - context->has_sparse_or_full = true; - resolver.fill_registers(context->registers, HLL_REGISTERS_COUNT); - } -} - -void HllSetHelper::init_context(HllContext* context) { - memset(context->registers, 0, HLL_REGISTERS_COUNT); - context->hash64_set = new std::set(); - context->has_value = false; - context->has_sparse_or_full = false; -} - } // namespace doris diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index 291039c7d9ab8c..2619dcf0c2dbed 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -33,14 +33,339 @@ const static int HLL_REGISTERS_COUNT = 16384; // maximum size in byte of serialized HLL: type(1) + registers (2^14) const static int HLL_COLUMN_DEFAULT_LEN = 16385; -struct HllContext { - bool has_value; - bool has_sparse_or_full; - char registers[HLL_REGISTERS_COUNT]; - std::set* hash64_set = nullptr; +// Hyperloglog distinct estimate algorithm. +// See these papers for more details. +// 1) Hyperloglog: The analysis of a near-optimal cardinality estimation +// algorithm (2007) +// 2) HyperLogLog in Practice (paper from google with some improvements) + +// 通过varchar的变长编码方式实现hll集合 +// 实现hll列中间计算结果的处理 +// empty 空集合 +// explicit 存储64位hash值的集合 +// sparse 存储hll非0的register +// full 存储全部的hll register +// empty -> explicit -> sparse -> full 四种类型的转换方向不可逆 +// 第一个字节存放hll集合的类型 0:empty 1:explicit 2:sparse 3:full +// 已决定后面的数据怎么解析 +class HyperLogLog { +public: + HyperLogLog(): _type(HLL_DATA_EMPTY){ + memset(_registers, 0, HLL_REGISTERS_COUNT); + } + + explicit HyperLogLog(uint64_t hash_value): _type(HLL_DATA_EXPLICIT) { + _hash_set.emplace(hash_value); + } + + explicit HyperLogLog(char* src) { + _type = (HllDataType)src[0]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + char* sparse_data = nullptr; + switch (_type) { + case HLL_DATA_EXPLICIT: + // first byte : type + // second~five byte : hash values's number + // five byte later : hash value + { + auto _explicit_num = (uint8_t) (src[sizeof(SetTypeValueType)]); + auto *_explicit_value = (uint64_t *) (src + sizeof(SetTypeValueType) + sizeof(uint8_t)); + for (int i = 0; i < _explicit_num; ++i) { + _hash_set.insert(_explicit_value[i]); + } + } + break; + case HLL_DATA_SPRASE: + // first byte : type + // second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero + // 2^HLL_COLUMN_PRECISION)/8 + 1以后value + { + auto* _sparse_count = (SparseLengthValueType*)(src + sizeof (SetTypeValueType)); + sparse_data = src + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); + std::map _sparse_map; + for (int i = 0; i < *_sparse_count; i++) { + auto* index = (SparseIndexType*)sparse_data; + sparse_data += sizeof(SparseIndexType); + auto* value = (SparseValueType*)sparse_data; + _sparse_map[*index] = *value; + sparse_data += sizeof(SetTypeValueType); + } + + for (auto iter: _sparse_map) { + _registers[iter.first] = (uint8_t)iter.second; + } + } + break; + case HLL_DATA_FULL: + // first byte : type + // second byte later : hll register value + { + char* _full_value_position = src + sizeof (SetTypeValueType); + memcpy(_registers, _full_value_position, HLL_REGISTERS_COUNT); + } + break; + case HLL_DATA_EMPTY: + break; + default: + break; + } + } + + typedef uint8_t SetTypeValueType; + typedef int32_t SparseLengthValueType; + typedef uint16_t SparseIndexType; + typedef uint8_t SparseValueType; + + static void update_registers(char* registers, uint64_t hash_value) { + // Use the lower bits to index into the number of streams and then + // find the first 1 bit after the index bits. + int idx = hash_value % HLL_REGISTERS_COUNT; + uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1; + registers[idx] = std::max((uint8_t)registers[idx], first_one_bit); + } + + static void merge_hash_set_to_registers(char* registers, const std::set& hash_set) { + for (auto hash_value: hash_set) { + update_registers(registers, hash_value); + } + } + + static void merge_registers(char* registers, const char* other_registers) { + for (int i = 0; i < doris::HLL_REGISTERS_COUNT; ++i) { + registers[i] = std::max(registers[i], other_registers[i]); + } + } + + static int serialize_full(char* result, char* registers) { + result[0] = HLL_DATA_FULL; + memcpy(result + 1, registers, HLL_REGISTERS_COUNT); + return HLL_COLUMN_DEFAULT_LEN; + } + + static int serialize_sparse(char *result, const std::map& index_to_value) { + result[0] = HLL_DATA_SPRASE; + int len = sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); + char* write_value_pos = result + len; + for (auto iter = index_to_value.begin(); + iter != index_to_value.end(); iter++) { + write_value_pos[0] = (char)(iter->first & 0xff); + write_value_pos[1] = (char)(iter->first >> 8 & 0xff); + write_value_pos[2] = iter->second; + write_value_pos += 3; + } + int registers_count = index_to_value.size(); + len += registers_count * (sizeof(SparseIndexType)+ sizeof(SparseValueType)); + *(int*)(result + 1) = registers_count; + return len; + } + + static int serialize_explicit(char* result, const std::set& hash_value_set) { + result[0] = HLL_DATA_EXPLICIT; + result[1] = (uint8_t)(hash_value_set.size()); + int len = sizeof(SetTypeValueType) + sizeof(uint8_t); + char* write_pos = result + len; + for (auto iter = hash_value_set.begin(); + iter != hash_value_set.end(); iter++) { + uint64_t hash_value = *iter; + *(uint64_t*)write_pos = hash_value; + write_pos += 8; + } + len += sizeof(uint64_t) * hash_value_set.size(); + return len; + } + + // change the _type to HLL_DATA_FULL directly has two reasons: + // 1. keep behavior consistent with before + // 2. make the code logic is simple + void update(const uint64_t hash_value) { + _type = HLL_DATA_FULL; + update_registers(_registers, hash_value); + } + + void merge(const HyperLogLog& other) { + if (other._type == HLL_DATA_EMPTY) { + return; + } + + // _type must change + if (_type == HLL_DATA_EMPTY) { + _type = other._type; + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set = other._hash_set; + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + return; + default: + return; + } + } + + // _type maybe change + if (_type == HLL_DATA_EXPLICIT) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + _type = other._type; + return; + default: + return; + } + } + + // _type maybe change + if (_type == HLL_DATA_SPRASE) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + merge_registers(_registers, other._registers); + _type = other._type; + return; + default: + return; + } + } + + // _type not change + if (_type == HLL_DATA_FULL) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + merge_registers(_registers, other._registers); + return; + default: + return; + } + } + } + + int serialize(char* dest) { + if (_type == HLL_DATA_EMPTY) { + dest[0] = _type; + return 1; + } + + std::map index_to_value; + if (_type == HLL_DATA_SPRASE || _type == HLL_DATA_FULL || + _hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { + merge_hash_set_to_registers(_registers, _hash_set); + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + if (_registers[i] != 0) { + index_to_value[i] = _registers[i]; + } + } + } + + int sparse_set_len = index_to_value.size() * (sizeof(SparseIndexType) + + sizeof(SparseValueType) + sizeof(SparseLengthValueType)); + int result_len = 0; + if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { + result_len = serialize_full(dest, _registers); + } else if (index_to_value.size() > 0) { + result_len = serialize_sparse(dest, index_to_value); + } else if (_hash_set.size() > 0) { + result_len = serialize_explicit(dest, _hash_set); + } + + return result_len & 0xffff;; + } + + int64_t estimate_cardinality() { + if (_type == HLL_DATA_EMPTY) { + return 0; + } + + merge_hash_set_to_registers(_registers, _hash_set); + + const int num_streams = HLL_REGISTERS_COUNT; + // Empirical constants for the algorithm. + float alpha = 0; + + if (num_streams == 16) { + alpha = 0.673f; + } else if (num_streams == 32) { + alpha = 0.697f; + } else if (num_streams == 64) { + alpha = 0.709f; + } else { + alpha = 0.7213f / (1 + 1.079f / num_streams); + } + + float harmonic_mean = 0; + int num_zero_registers = 0; + + for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { + harmonic_mean += powf(2.0f, -_registers[i]); + + if (_registers[i] == 0) { + ++num_zero_registers; + } + } + + harmonic_mean = 1.0f / harmonic_mean; + double estimate = alpha * num_streams * num_streams * harmonic_mean; + // according to HerperLogLog current correction, if E is cardinal + // E =< num_streams * 2.5 , LC has higher accuracy. + // num_streams * 2.5 < E , HerperLogLog has higher accuracy. + // Generally , we can use HerperLogLog to produce value as E. + if (estimate <= num_streams * 2.5 && num_zero_registers != 0) { + // Estimated cardinality is too low. Hll is too inaccurate here, instead use + // linear counting. + estimate = num_streams * log(static_cast(num_streams) / num_zero_registers); + } else if (num_streams == 16384 && estimate < 72000) { + // when Linear Couint change to HerperLoglog according to HerperLogLog Correction, + // there are relatively large fluctuations, we fixed the problem refer to redis. + double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) + - 1.4253 * 1.0e-12 * (estimate * estimate * estimate) + + 1.2940 * 1.0e-7 * (estimate * estimate) + - 5.2921 * 1.0e-3 * estimate + + 83.3216; + estimate -= estimate * (bias / 100); + } + return (int64_t)(estimate + 0.5); + } + + // only for debug + std::string to_string() { + switch (_type) { + case HLL_DATA_EMPTY: + return {}; + case HLL_DATA_EXPLICIT: + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + { + std::string str {"hash set size: "}; + str.append(std::to_string(_hash_set.size())); + str.append("\ncardinality:\t"); + str.append(std::to_string(estimate_cardinality())); + str.append("\ntype:\t"); + str.append(std::to_string(_type)); + return str; + } + default: + return {}; + } + } + +private: + HllDataType _type; + char _registers[HLL_REGISTERS_COUNT]; + std::set _hash_set; }; -// help parse hll set +// todo(kks): remove this when dpp_sink class was removed class HllSetResolver { public: HllSetResolver() : _buf_ref(nullptr), @@ -82,21 +407,11 @@ class HllSetResolver { return _explicit_value[index]; }; - // get explicit index value 64bit - char* get_explicit_value() { - return (char*)_explicit_value; - }; - // get full register value char* get_full_value() { return _full_value_position; }; - // get sparse (index, value) count - int get_sparse_count() { - return (int)*_sparse_count; - }; - // get (index, value) map std::map& get_sparse_map() { return _sparse_map; @@ -104,16 +419,6 @@ class HllSetResolver { // parse set , call after copy() or init() void parse(); - - // fill registers with set - void fill_registers(char* registers, int len); - - // fill map with set - void fill_index_to_value_map(std::map* index_to_value, int len); - - // fill hash map - void fill_hash64_set(std::set* hash_set); - private : char* _buf_ref; // set int _buf_len; // set len @@ -125,27 +430,13 @@ private : SparseLengthValueType* _sparse_count; }; -// 通过varchar的变长编码方式实现hll集合 -// 实现hll列中间计算结果的处理 -// empty 空集合 -// explicit 存储64位hash值的集合 -// sparse 存储hll非0的register -// full 存储全部的hll register -// empty -> explicit -> sparse -> full 四种类型的转换方向不可逆 -// 第一个字节存放hll集合的类型 0:empty 1:explicit 2:sparse 3:full -// 已决定后面的数据怎么解析 +// todo(kks): remove this when dpp_sink class was removed class HllSetHelper { public: static void set_sparse(char *result, const std::map& index_to_value, int& len); static void set_explicit(char* result, const std::set& hash_value_set, int& len); - static void set_full(char* result, const char* registers, const int set_len, int& len); static void set_full(char* result, const std::map& index_to_value, const int set_len, int& len); - static void set_max_register(char *registers, - int registers_len, - const std::set& hash_set); - static void fill_set(const char* data, HllContext* context); - static void init_context(HllContext* context); }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index e1031dce3e3124..8aedd664a78f9d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -64,20 +64,9 @@ void MemTable::insert(Tuple* tuple) { auto cell = row.cell(i); const SlotDescriptor* slot = slots[(*_col_ids)[i]]; - // todo(kks): currently, HLL implementation don't have a merge method - // we should refactor HLL implementation and remove this special case handle - if (slot->type() == TYPE_HLL && _skip_list->Contains(_tuple_buf)) { - cell.set_not_null(); - const StringValue* src = tuple->get_string_slot(slot->tuple_offset()); - auto* dest = (Slice*)(cell.cell_ptr()); - dest->size = src->len; - dest->data = _arena.Allocate(dest->size); - memcpy(dest->data, src->ptr, dest->size); - } else { - bool is_null = tuple->is_null(slot->null_indicator_offset()); - void* value = tuple->get_slot(slot->tuple_offset()); - _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena()); - } + bool is_null = tuple->is_null(slot->null_indicator_offset()); + void* value = tuple->get_slot(slot->tuple_offset()); + _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena()); } bool overwritten = false; diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 227f4bdd73dc81..192de668e77cc3 100755 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -770,6 +770,8 @@ struct LargeIntVal : public AnyVal { } }; +// todo(kks): keep HllVal struct only for backward compatibility, we should remove it +// when doris 0.12 release struct HllVal : public StringVal { HllVal() : StringVal() { } diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt index 9e0b042f0b4751..d7af9962a00430 100644 --- a/be/test/exprs/CMakeLists.txt +++ b/be/test/exprs/CMakeLists.txt @@ -30,4 +30,5 @@ ADD_BE_TEST(string_functions_test) ADD_BE_TEST(timestamp_functions_test) ADD_BE_TEST(percentile_approx_test) ADD_BE_TEST(bitmap_function_test) +ADD_BE_TEST(hll_function_test) #ADD_BE_TEST(in-predicate-test) diff --git a/be/test/exprs/hll_function_test.cpp b/be/test/exprs/hll_function_test.cpp new file mode 100644 index 00000000000000..68a6664b06569b --- /dev/null +++ b/be/test/exprs/hll_function_test.cpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "exprs/aggregate_functions.h" +#include "exprs/anyval_util.h" +#include "exprs/hll_function.h" +#include "testutil/function_utils.h" +#include "olap/hll.h" +#include "util/logging.h" + +#include + +namespace doris { + +StringVal convert_hll_to_string(FunctionContext* ctx, HyperLogLog& hll) { + std::string buf; + buf.resize(HLL_COLUMN_DEFAULT_LEN); + int size = hll.serialize((char*)buf.c_str()); + buf.resize(size); + return AnyValUtil::from_string_temp(ctx, buf); +} + +class HllFunctionsTest : public testing::Test { +public: + HllFunctionsTest() = default; + + void SetUp() { + utils = new FunctionUtils(); + ctx = utils->get_fn_ctx(); + } + void TearDown() { + delete utils; + } + +private: + FunctionUtils* utils; + FunctionContext* ctx; +}; + +TEST_F(HllFunctionsTest, hll_hash) { + StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024")); + StringVal result = HllFunctions::hll_hash(ctx, input); + + HyperLogLog hll((char*)result.ptr); + int64_t cardinality = hll.estimate_cardinality(); + int64_t expected = 1; + + ASSERT_EQ(expected, cardinality); +} + +TEST_F(HllFunctionsTest, hll_hash_null) { + StringVal input = StringVal::null(); + StringVal result = HllFunctions::hll_hash(ctx, input); + + HyperLogLog hll((char*)result.ptr); + int64_t cardinality = hll.estimate_cardinality(); + int64_t expected = 0; + + ASSERT_EQ(expected, cardinality); +} + +TEST_F(HllFunctionsTest, hll_update) { + StringVal dst; + HllFunctions::hll_init(ctx, &dst); + IntVal src1(1); + HllFunctions::hll_update(ctx, src1, &dst); + IntVal src2(1234567); + HllFunctions::hll_update(ctx, src2, &dst); + + BigIntVal result = HllFunctions::hll_finalize(ctx, dst); + BigIntVal expected(2); + ASSERT_EQ(expected, result); +} + +TEST_F(HllFunctionsTest, hll_merge) { + StringVal dst; + HllFunctions::hll_init(ctx, &dst); + + HyperLogLog hll1(1024); + StringVal src1 = convert_hll_to_string(ctx, hll1); + HllFunctions::hll_merge(ctx, src1, &dst); + + HyperLogLog hll2; + StringVal src2 = convert_hll_to_string(ctx, hll2); + HllFunctions::hll_merge(ctx, src2, &dst); + + StringVal serialized = HllFunctions::hll_serialize(ctx, dst); + HyperLogLog hll((char*)serialized.ptr); + + BigIntVal expected(1); + ASSERT_EQ(expected, hll.estimate_cardinality()); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java index a5c9c715a19f5c..b81dda33b6a3c6 100644 --- a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -298,9 +298,9 @@ public void init() { private static final Map HLL_UNION_AGG_UPDATE_SYMBOL = ImmutableMap.builder() .put(Type.VARCHAR, - "20hll_union_agg_updateEPN9doris_udf15FunctionContextERKNS1_6HllValEPS4_") + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_") .put(Type.HLL, - "20hll_union_agg_updateEPN9doris_udf15FunctionContextERKNS1_6HllValEPS4_") + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_") .build(); private static final Map OFFSET_FN_INIT_SYMBOL = @@ -843,12 +843,12 @@ private void initAggregateBuiltins() { // NDV // ndv return string addBuiltin(AggregateFunction.createBuiltin("ndv", - Lists.newArrayList(t), Type.VARCHAR, Type.VARCHAR, - prefix + "8hll_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", - prefix + HLL_UPDATE_SYMBOL.get(t), - prefix + "9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", - null, - prefix + "12hll_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + Lists.newArrayList(t), Type.BIGINT, Type.VARCHAR, + "_ZN5doris12HllFunctions8hll_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + "_ZN5doris12HllFunctions" + HLL_UPDATE_SYMBOL.get(t), + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris12HllFunctions12hll_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", true, false, true)); // BITMAP_UNION_INT @@ -864,23 +864,23 @@ private void initAggregateBuiltins() { // HLL_UNION_AGG addBuiltin(AggregateFunction.createBuiltin("hll_union_agg", Lists.newArrayList(t), Type.BIGINT, Type.VARCHAR, - prefix + "18hll_union_agg_initEPN9doris_udf15FunctionContextEPNS1_6HllValE", - prefix + HLL_UNION_AGG_UPDATE_SYMBOL.get(t), - prefix + "19hll_union_agg_mergeEPN9doris_udf15FunctionContextERKNS1_6HllValEPS4_", + "_ZN5doris12HllFunctions8hll_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + HLL_UNION_AGG_UPDATE_SYMBOL.get(t), + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris12HllFunctions12hll_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", null, - prefix + "22hll_union_agg_finalizeEPN9doris_udf15FunctionContextERKNS1_6HllValE", - null, - prefix + "22hll_union_agg_finalizeEPN9doris_udf15FunctionContextERKNS1_6HllValE", + "_ZN5doris12HllFunctions12hll_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", true, true, true)); // HLL_RAW_AGG addBuiltin(AggregateFunction.createBuiltin("hll_raw_agg", Lists.newArrayList(t), Type.HLL, Type.HLL, - prefix + "16hll_raw_agg_initEPN9doris_udf15FunctionContextEPNS1_6HllValE", - prefix + "18hll_raw_agg_updateEPN9doris_udf15FunctionContextERKNS1_6HllValEPS4_", - prefix + "17hll_raw_agg_mergeEPN9doris_udf15FunctionContextERKNS1_6HllValEPS4_", - null, - prefix + "20hll_raw_agg_finalizeEPN9doris_udf15FunctionContextERKNS1_6HllValE", + "_ZN5doris12HllFunctions8hll_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris12HllFunctions9hll_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", true, false, true)); if (STDDEV_UPDATE_SYMBOL.containsKey(t)) { diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 0ddea3cacef257..9fc365107682d6 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -583,10 +583,10 @@ '_ZN5doris13JsonFunctions15json_path_closeEPN9doris_udf15FunctionContextENS2_18FunctionStateScopeE'], #hll function - [['hll_cardinality'], 'BIGINT', ['HLL'], - '_ZN5doris16HllHashFunctions15hll_cardinalityEPN9doris_udf15FunctionContextERKNS1_6HllValE'], + [['hll_cardinality'], 'BIGINT', ['VARCHAR'], + '_ZN5doris12HllFunctions15hll_cardinalityEPN9doris_udf15FunctionContextERKNS1_9StringValE'], [['hll_hash'], 'VARCHAR', ['VARCHAR'], - '_ZN5doris16HllHashFunctions8hll_hashEPN9doris_udf15FunctionContextERKNS1_9StringValE'], + '_ZN5doris12HllFunctions8hll_hashEPN9doris_udf15FunctionContextERKNS1_9StringValE'], #bitmap function diff --git a/run-ut.sh b/run-ut.sh index f8c4cffa3246e1..6c5a23944baeb6 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -168,6 +168,7 @@ ${DORIS_TEST_BINARY_DIR}/exprs/json_function_test ${DORIS_TEST_BINARY_DIR}/exprs/timestamp_functions_test ${DORIS_TEST_BINARY_DIR}/exprs/percentile_approx_test ${DORIS_TEST_BINARY_DIR}/exprs/bitmap_function_test +${DORIS_TEST_BINARY_DIR}/exprs/hll_function_test ## Running geo unit test ${DORIS_TEST_BINARY_DIR}/geo/geo_functions_test From 92c91faaaad6c71e22c59e96ee7865416d324474 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Mon, 9 Sep 2019 14:30:49 +0800 Subject: [PATCH 2/2] update for comment --- be/src/exprs/hll_function.cpp | 3 +- be/src/olap/hll.cpp | 208 ++++++++++++++++++++++++++ be/src/olap/hll.h | 274 +++++----------------------------- 3 files changed, 247 insertions(+), 238 deletions(-) diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp index 1769a211f6a6cb..d8c759bd57c641 100644 --- a/be/src/exprs/hll_function.cpp +++ b/be/src/exprs/hll_function.cpp @@ -32,12 +32,13 @@ StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) { const int HLL_SINGLE_VALUE_SIZE = 10; const int HLL_EMPTY_SIZE = 1; std::string buf; - std::unique_ptr hll {new HyperLogLog()}; + std::unique_ptr hll; if (!input.is_null) { uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED); hll.reset(new HyperLogLog(hash_value)); buf.resize(HLL_SINGLE_VALUE_SIZE); } else { + hll.reset(new HyperLogLog()); buf.resize(HLL_EMPTY_SIZE); } hll->serialize((char*)buf.c_str()); diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index 3e3f6861b46bb0..192bbc02f8c614 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -27,6 +27,214 @@ using std::stringstream; namespace doris { +HyperLogLog::HyperLogLog(char* src) { + _type = (HllDataType)src[0]; + memset(_registers, 0, HLL_REGISTERS_COUNT); + char* sparse_data = nullptr; + switch (_type) { + case HLL_DATA_EXPLICIT: + // first byte : type + // second~five byte : hash values's number + // five byte later : hash value + { + auto _explicit_num = (uint8_t) (src[sizeof(SetTypeValueType)]); + auto *_explicit_value = (uint64_t *) (src + sizeof(SetTypeValueType) + sizeof(uint8_t)); + for (int i = 0; i < _explicit_num; ++i) { + _hash_set.insert(_explicit_value[i]); + } + } + break; + case HLL_DATA_SPRASE: + // first byte : type + // second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero + // 2^HLL_COLUMN_PRECISION)/8 + 1以后value + { + auto* _sparse_count = (SparseLengthValueType*)(src + sizeof (SetTypeValueType)); + sparse_data = src + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); + std::map _sparse_map; + for (int i = 0; i < *_sparse_count; i++) { + auto* index = (SparseIndexType*)sparse_data; + sparse_data += sizeof(SparseIndexType); + auto* value = (SparseValueType*)sparse_data; + _sparse_map[*index] = *value; + sparse_data += sizeof(SetTypeValueType); + } + + for (auto iter: _sparse_map) { + _registers[iter.first] = (uint8_t)iter.second; + } + } + break; + case HLL_DATA_FULL: + // first byte : type + // second byte later : hll register value + { + char* _full_value_position = src + sizeof (SetTypeValueType); + memcpy(_registers, _full_value_position, HLL_REGISTERS_COUNT); + } + break; + case HLL_DATA_EMPTY: + break; + default: + break; + } +} + +void HyperLogLog::merge(const HyperLogLog& other) { + if (other._type == HLL_DATA_EMPTY) { + return; + } + + // _type must change + if (_type == HLL_DATA_EMPTY) { + _type = other._type; + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set = other._hash_set; + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + return; + default: + return; + } + } + + // _type maybe change + if (_type == HLL_DATA_EXPLICIT) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); + _type = other._type; + return; + default: + return; + } + } + + // _type maybe change + if (_type == HLL_DATA_SPRASE) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + merge_registers(_registers, other._registers); + _type = other._type; + return; + default: + return; + } + } + + // _type not change + if (_type == HLL_DATA_FULL) { + switch (other._type) { + case HLL_DATA_EXPLICIT: + _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); + return; + case HLL_DATA_SPRASE: + case HLL_DATA_FULL: + merge_registers(_registers, other._registers); + return; + default: + return; + } + } +} + +int HyperLogLog::serialize(char* dest) { + if (_type == HLL_DATA_EMPTY) { + dest[0] = _type; + return 1; + } + + std::map index_to_value; + if (_type == HLL_DATA_SPRASE || _type == HLL_DATA_FULL || + _hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { + merge_hash_set_to_registers(_registers, _hash_set); + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + if (_registers[i] != 0) { + index_to_value[i] = _registers[i]; + } + } + } + + int sparse_set_len = index_to_value.size() * (sizeof(SparseIndexType) + + sizeof(SparseValueType) + sizeof(SparseLengthValueType)); + int result_len = 0; + if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { + result_len = serialize_full(dest, _registers); + } else if (index_to_value.size() > 0) { + result_len = serialize_sparse(dest, index_to_value); + } else if (_hash_set.size() > 0) { + result_len = serialize_explicit(dest, _hash_set); + } + + return result_len & 0xffff; +} + +int64_t HyperLogLog::estimate_cardinality() { + if (_type == HLL_DATA_EMPTY) { + return 0; + } + + merge_hash_set_to_registers(_registers, _hash_set); + + const int num_streams = HLL_REGISTERS_COUNT; + // Empirical constants for the algorithm. + float alpha = 0; + + if (num_streams == 16) { + alpha = 0.673f; + } else if (num_streams == 32) { + alpha = 0.697f; + } else if (num_streams == 64) { + alpha = 0.709f; + } else { + alpha = 0.7213f / (1 + 1.079f / num_streams); + } + + float harmonic_mean = 0; + int num_zero_registers = 0; + + for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { + harmonic_mean += powf(2.0f, -_registers[i]); + + if (_registers[i] == 0) { + ++num_zero_registers; + } + } + + harmonic_mean = 1.0f / harmonic_mean; + double estimate = alpha * num_streams * num_streams * harmonic_mean; + // according to HerperLogLog current correction, if E is cardinal + // E =< num_streams * 2.5 , LC has higher accuracy. + // num_streams * 2.5 < E , HerperLogLog has higher accuracy. + // Generally , we can use HerperLogLog to produce value as E. + if (estimate <= num_streams * 2.5 && num_zero_registers != 0) { + // Estimated cardinality is too low. Hll is too inaccurate here, instead use + // linear counting. + estimate = num_streams * log(static_cast(num_streams) / num_zero_registers); + } else if (num_streams == 16384 && estimate < 72000) { + // when Linear Couint change to HerperLoglog according to HerperLogLog Correction, + // there are relatively large fluctuations, we fixed the problem refer to redis. + double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) + - 1.4253 * 1.0e-12 * (estimate * estimate * estimate) + + 1.2940 * 1.0e-7 * (estimate * estimate) + - 5.2921 * 1.0e-3 * estimate + + 83.3216; + estimate -= estimate * (bias / 100); + } + return (int64_t)(estimate + 0.5); +} + void HllSetResolver::parse() { // skip LengthValueType char* pdata = _buf_ref; diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index 2619dcf0c2dbed..e6506bd8481da4 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -58,63 +58,53 @@ class HyperLogLog { _hash_set.emplace(hash_value); } - explicit HyperLogLog(char* src) { - _type = (HllDataType)src[0]; - memset(_registers, 0, HLL_REGISTERS_COUNT); - char* sparse_data = nullptr; + explicit HyperLogLog(char* src); + + typedef uint8_t SetTypeValueType; + typedef int32_t SparseLengthValueType; + typedef uint16_t SparseIndexType; + typedef uint8_t SparseValueType; + + // change the _type to HLL_DATA_FULL directly has two reasons: + // 1. keep behavior consistent with before + // 2. make the code logic is simple + void update(const uint64_t hash_value) { + _type = HLL_DATA_FULL; + update_registers(_registers, hash_value); + } + + void merge(const HyperLogLog& other); + + int serialize(char* dest); + + int64_t estimate_cardinality(); + + // only for debug + std::string to_string() { switch (_type) { + case HLL_DATA_EMPTY: + return {}; case HLL_DATA_EXPLICIT: - // first byte : type - // second~five byte : hash values's number - // five byte later : hash value - { - auto _explicit_num = (uint8_t) (src[sizeof(SetTypeValueType)]); - auto *_explicit_value = (uint64_t *) (src + sizeof(SetTypeValueType) + sizeof(uint8_t)); - for (int i = 0; i < _explicit_num; ++i) { - _hash_set.insert(_explicit_value[i]); - } - } - break; case HLL_DATA_SPRASE: - // first byte : type - // second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero - // 2^HLL_COLUMN_PRECISION)/8 + 1以后value - { - auto* _sparse_count = (SparseLengthValueType*)(src + sizeof (SetTypeValueType)); - sparse_data = src + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType); - std::map _sparse_map; - for (int i = 0; i < *_sparse_count; i++) { - auto* index = (SparseIndexType*)sparse_data; - sparse_data += sizeof(SparseIndexType); - auto* value = (SparseValueType*)sparse_data; - _sparse_map[*index] = *value; - sparse_data += sizeof(SetTypeValueType); - } - - for (auto iter: _sparse_map) { - _registers[iter.first] = (uint8_t)iter.second; - } - } - break; case HLL_DATA_FULL: - // first byte : type - // second byte later : hll register value { - char* _full_value_position = src + sizeof (SetTypeValueType); - memcpy(_registers, _full_value_position, HLL_REGISTERS_COUNT); + std::string str {"hash set size: "}; + str.append(std::to_string(_hash_set.size())); + str.append("\ncardinality:\t"); + str.append(std::to_string(estimate_cardinality())); + str.append("\ntype:\t"); + str.append(std::to_string(_type)); + return str; } - break; - case HLL_DATA_EMPTY: - break; default: - break; + return {}; } } - typedef uint8_t SetTypeValueType; - typedef int32_t SparseLengthValueType; - typedef uint16_t SparseIndexType; - typedef uint8_t SparseValueType; +private: + HllDataType _type; + char _registers[HLL_REGISTERS_COUNT]; + std::set _hash_set; static void update_registers(char* registers, uint64_t hash_value) { // Use the lower bits to index into the number of streams and then @@ -173,196 +163,6 @@ class HyperLogLog { len += sizeof(uint64_t) * hash_value_set.size(); return len; } - - // change the _type to HLL_DATA_FULL directly has two reasons: - // 1. keep behavior consistent with before - // 2. make the code logic is simple - void update(const uint64_t hash_value) { - _type = HLL_DATA_FULL; - update_registers(_registers, hash_value); - } - - void merge(const HyperLogLog& other) { - if (other._type == HLL_DATA_EMPTY) { - return; - } - - // _type must change - if (_type == HLL_DATA_EMPTY) { - _type = other._type; - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set = other._hash_set; - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - return; - default: - return; - } - } - - // _type maybe change - if (_type == HLL_DATA_EXPLICIT) { - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); - _type = other._type; - return; - default: - return; - } - } - - // _type maybe change - if (_type == HLL_DATA_SPRASE) { - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - _type = other._type; - return; - default: - return; - } - } - - // _type not change - if (_type == HLL_DATA_FULL) { - switch (other._type) { - case HLL_DATA_EXPLICIT: - _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); - return; - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - merge_registers(_registers, other._registers); - return; - default: - return; - } - } - } - - int serialize(char* dest) { - if (_type == HLL_DATA_EMPTY) { - dest[0] = _type; - return 1; - } - - std::map index_to_value; - if (_type == HLL_DATA_SPRASE || _type == HLL_DATA_FULL || - _hash_set.size() > HLL_EXPLICLIT_INT64_NUM) { - merge_hash_set_to_registers(_registers, _hash_set); - for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { - if (_registers[i] != 0) { - index_to_value[i] = _registers[i]; - } - } - } - - int sparse_set_len = index_to_value.size() * (sizeof(SparseIndexType) - + sizeof(SparseValueType) + sizeof(SparseLengthValueType)); - int result_len = 0; - if (sparse_set_len >= HLL_COLUMN_DEFAULT_LEN) { - result_len = serialize_full(dest, _registers); - } else if (index_to_value.size() > 0) { - result_len = serialize_sparse(dest, index_to_value); - } else if (_hash_set.size() > 0) { - result_len = serialize_explicit(dest, _hash_set); - } - - return result_len & 0xffff;; - } - - int64_t estimate_cardinality() { - if (_type == HLL_DATA_EMPTY) { - return 0; - } - - merge_hash_set_to_registers(_registers, _hash_set); - - const int num_streams = HLL_REGISTERS_COUNT; - // Empirical constants for the algorithm. - float alpha = 0; - - if (num_streams == 16) { - alpha = 0.673f; - } else if (num_streams == 32) { - alpha = 0.697f; - } else if (num_streams == 64) { - alpha = 0.709f; - } else { - alpha = 0.7213f / (1 + 1.079f / num_streams); - } - - float harmonic_mean = 0; - int num_zero_registers = 0; - - for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { - harmonic_mean += powf(2.0f, -_registers[i]); - - if (_registers[i] == 0) { - ++num_zero_registers; - } - } - - harmonic_mean = 1.0f / harmonic_mean; - double estimate = alpha * num_streams * num_streams * harmonic_mean; - // according to HerperLogLog current correction, if E is cardinal - // E =< num_streams * 2.5 , LC has higher accuracy. - // num_streams * 2.5 < E , HerperLogLog has higher accuracy. - // Generally , we can use HerperLogLog to produce value as E. - if (estimate <= num_streams * 2.5 && num_zero_registers != 0) { - // Estimated cardinality is too low. Hll is too inaccurate here, instead use - // linear counting. - estimate = num_streams * log(static_cast(num_streams) / num_zero_registers); - } else if (num_streams == 16384 && estimate < 72000) { - // when Linear Couint change to HerperLoglog according to HerperLogLog Correction, - // there are relatively large fluctuations, we fixed the problem refer to redis. - double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) - - 1.4253 * 1.0e-12 * (estimate * estimate * estimate) + - 1.2940 * 1.0e-7 * (estimate * estimate) - - 5.2921 * 1.0e-3 * estimate + - 83.3216; - estimate -= estimate * (bias / 100); - } - return (int64_t)(estimate + 0.5); - } - - // only for debug - std::string to_string() { - switch (_type) { - case HLL_DATA_EMPTY: - return {}; - case HLL_DATA_EXPLICIT: - case HLL_DATA_SPRASE: - case HLL_DATA_FULL: - { - std::string str {"hash set size: "}; - str.append(std::to_string(_hash_set.size())); - str.append("\ncardinality:\t"); - str.append(std::to_string(estimate_cardinality())); - str.append("\ntype:\t"); - str.append(std::to_string(_type)); - return str; - } - default: - return {}; - } - } - -private: - HllDataType _type; - char _registers[HLL_REGISTERS_COUNT]; - std::set _hash_set; }; // todo(kks): remove this when dpp_sink class was removed