Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "exprs/hll_hash_function.h"
#include "exprs/timezone_db.h"
#include "exprs/bitmap_function.h"
#include "exprs/hll_function.h"
#include "geo/geo_functions.h"
#include "olap/options.h"
#include "util/time.h"
Expand Down Expand Up @@ -272,6 +273,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
GeoFunctions::init();
TimezoneDatabase::init();
BitmapFunctions::init();
HllFunctions::init();

pthread_t tc_malloc_pid;
pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,14 +672,14 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) {
std::string str(src, len);
if (str != "\\N") {
uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED);
char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE];
char buf[10];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a const or a macro, better not to use a magic number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Use magic number only for simple.
Because I think we could delete this class code in doris 0.12 version.

// expliclit set
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
*result = std::string(buf, sizeof(buf));
} else {
char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE];
char buf[1];
// empty set
buf[0] = HLL_DATA_EMPTY;
*result = std::string(buf, sizeof(buf));
Expand Down
8 changes: 1 addition & 7 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,4 @@ add_library(Exprs
agg_fn.cc
new_agg_fn_evaluator.cc
bitmap_function.cpp
)
#ADD_BE_TEST(json_function_test)
#ADD_BE_TEST(binary_predicate_test)
#ADD_BE_TEST(in_predicate_test)
#ADD_BE_TEST(expr-test)
#ADD_BE_TEST(hybird_set_test)
#ADD_BE_TEST(in-predicate-test)
hll_function.cpp)
28 changes: 3 additions & 25 deletions be/src/exprs/aggregate_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,14 +1147,11 @@ void AggregateFunctions::hll_update(FunctionContext* ctx, const T& src, StringVa
}

DCHECK(!dst->is_null);
DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION));
DCHECK_EQ(dst->len, HLL_REGISTERS_COUNT);
uint64_t hash_value = AnyValUtil::hash64_murmur(src, HashUtil::MURMUR_SEED);

if (hash_value != 0) {
// Use the lower bits to index into the number of streams and then
// find the first 1 bit after the index bits.
int idx = hash_value % dst->len;
// uint8_t first_one_bit = __buiHLL_LENltin_ctzl(hash_value >> HLL_PRECISION) + 1;
uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_COLUMN_PRECISION) + 1;
dst->ptr[idx] = std::max(dst->ptr[idx], first_one_bit);
}
Expand Down Expand Up @@ -1193,7 +1190,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx,
return;
}
DCHECK(!dst->is_null);

dst->agg_parse_and_cal(src);
return ;
}
Expand All @@ -1203,7 +1200,7 @@ void AggregateFunctions::hll_union_agg_merge(FunctionContext* ctx, const HllVal&
DCHECK(!src.is_null);
DCHECK_EQ(dst->len, HLL_COLUMN_DEFAULT_LEN);
DCHECK_EQ(src.len, HLL_COLUMN_DEFAULT_LEN);

dst->agg_merge(src);
}

Expand All @@ -1214,25 +1211,6 @@ doris_udf::BigIntVal AggregateFunctions::hll_union_agg_finalize(doris_udf::Funct
return result;
}

void AggregateFunctions::hll_union_agg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* slot) {
hll_union_agg_init(ctx, static_cast<HllVal*> (slot));
}

void AggregateFunctions::hll_union_agg_update(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src,
doris_udf::StringVal* dst) {
hll_union_agg_update(ctx, static_cast<const HllVal&> (src), static_cast<HllVal*> (dst));
}

void AggregateFunctions::hll_union_agg_merge(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src,
doris_udf::StringVal* dst) {
hll_union_agg_merge(ctx, static_cast<const HllVal&> (src), static_cast<HllVal*> (dst));
}

doris_udf::StringVal AggregateFunctions::hll_union_agg_finalize(doris_udf::FunctionContext* ctx, const StringVal& src) {
BigIntVal intVal = hll_union_agg_finalize(ctx, static_cast<const HllVal&> (src));
return AnyValUtil::from_string_temp(ctx, std::to_string(intVal.val));;
}

int64_t AggregateFunctions::hll_algorithm(uint8_t *pdata, int data_len) {
DCHECK_EQ(data_len, HLL_REGISTERS_COUNT);

Expand Down
47 changes: 13 additions & 34 deletions be/src/exprs/aggregate_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,6 @@ dst);
const doris_udf::DecimalV2Val& src,
doris_udf::StringVal* dst);

// static void decimal_avg_add_or_remove(doris_udf::FunctionContext* ctx,
// const doris_udf::DecimalVal& src,
// doris_udf::StringVal* dst, bool remove);
// static void decimal_avg_add_or_remove(doris_udf::FunctionContext* ctx,
// const doris_udf::DecimalVal& src,
// doris_udf::StringVal* dst) {
// return decimal_avg_add_or_remove(ctx, src, dst, false);
// }
static doris_udf::DecimalVal decimal_avg_get_value(doris_udf::FunctionContext* ctx,
const doris_udf::StringVal& val);
static doris_udf::DecimalV2Val decimalv2_avg_get_value(doris_udf::FunctionContext* ctx,
Expand Down Expand Up @@ -194,22 +186,6 @@ dst);
doris_udf::FunctionContext*,
const doris_udf::StringVal& src);

// Hyperloglog distinct estimate algorithm.
// See these papers for more details.
// 1) Hyperloglog: The analysis of a near-optimal cardinality estimation
// algorithm (2007)
// 2) HyperLogLog in Practice (paper from google with some improvements)
static void hll_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot);
template <typename T>
static void hll_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst);
static void hll_merge(
doris_udf::FunctionContext*,
const doris_udf::StringVal& src,
doris_udf::StringVal* dst);
static doris_udf::StringVal hll_finalize(
doris_udf::FunctionContext*,
const doris_udf::StringVal& src);

// count and sum distinct algorithm in multi distinct
template <typename T>
static void count_or_sum_distinct_numeric_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst);
Expand Down Expand Up @@ -335,8 +311,19 @@ dst);
static void offset_fn_update(doris_udf::FunctionContext*, const T& src,
const doris_udf::BigIntVal&, const T&, T* dst);

// HLL value type calculate
// init sets buffer
// todo(kks): keep following HLL methods only for backward compatibility, we should remove these methods
// when doris 0.12 release
static void hll_init(doris_udf::FunctionContext*, doris_udf::StringVal* slot);
template <typename T>
static void hll_update(doris_udf::FunctionContext*, const T& src, doris_udf::StringVal* dst);
static void hll_merge(
doris_udf::FunctionContext*,
const doris_udf::StringVal& src,
doris_udf::StringVal* dst);
static doris_udf::StringVal hll_finalize(
doris_udf::FunctionContext*,
const doris_udf::StringVal& src);

static void hll_union_agg_init(doris_udf::FunctionContext*, doris_udf::HllVal* slot);
// fill all register accroading to hll set type
static void hll_union_agg_update(doris_udf::FunctionContext*, const doris_udf::HllVal& src,
Expand All @@ -351,14 +338,6 @@ dst);
doris_udf::FunctionContext*,
const doris_udf::HllVal& src);

//for backward compatibility, we could remove the following four HLL methods after doris 0.11 version
static void hll_union_agg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* slot);
static void hll_union_agg_update(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src,
doris_udf::StringVal* dst);
static void hll_union_agg_merge(doris_udf::FunctionContext* ctx,const doris_udf::StringVal& src,
doris_udf::StringVal* dst);
static doris_udf::StringVal hll_union_agg_finalize(doris_udf::FunctionContext* ctx, const StringVal& src);

// calculate result
static int64_t hll_algorithm(uint8_t *pdata, int data_len);
static int64_t hll_algorithm(const StringVal &dst) {
Expand Down
110 changes: 110 additions & 0 deletions be/src/exprs/hll_function.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/hll_function.h"

#include "exprs/anyval_util.h"
#include "util/hash_util.hpp"

namespace doris {

using doris_udf::BigIntVal;
using doris_udf::StringVal;

void HllFunctions::init() {
}

StringVal HllFunctions::hll_hash(FunctionContext* ctx, const StringVal& input) {
const int HLL_SINGLE_VALUE_SIZE = 10;
const int HLL_EMPTY_SIZE = 1;
std::string buf;
std::unique_ptr<HyperLogLog> hll;
if (!input.is_null) {
uint64_t hash_value = HashUtil::murmur_hash64A(input.ptr, input.len, HashUtil::MURMUR_SEED);
hll.reset(new HyperLogLog(hash_value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case we create HyperLogLog two times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will Fix

buf.resize(HLL_SINGLE_VALUE_SIZE);
} else {
hll.reset(new HyperLogLog());
buf.resize(HLL_EMPTY_SIZE);
}
hll->serialize((char*)buf.c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not serialize(std::string*) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because slice.data type is char* and many HLL methods use char*.

return AnyValUtil::from_string_temp(ctx, buf);
}

void HllFunctions::hll_init(FunctionContext *, StringVal* dst) {
dst->is_null = false;
dst->len = sizeof(HyperLogLog);
dst->ptr = (uint8_t*)new HyperLogLog();
}

template <typename T>
void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) {
if (src.is_null) {
return;
}

uint64_t hash_value = AnyValUtil::hash64_murmur(src, HashUtil::MURMUR_SEED);
if (hash_value != 0) {
auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
dst_hll->update(hash_value);
}
}
void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) {
HyperLogLog src_hll = HyperLogLog((char*)src.ptr);
auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
dst_hll->merge(src_hll);
}

BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) {
auto* src_hll = reinterpret_cast<HyperLogLog*>(src.ptr);
BigIntVal result(src_hll->estimate_cardinality());
delete src_hll;
return result;
}

BigIntVal HllFunctions::hll_cardinality(FunctionContext* ctx, const StringVal& input) {
if (input.is_null) {
return BigIntVal::null();
}
StringVal dst;
hll_init(ctx, &dst);
hll_merge(ctx, input, &dst);
return hll_finalize(ctx, dst);
}

StringVal HllFunctions::hll_serialize(FunctionContext *ctx, const StringVal &src) {
auto* src_hll = reinterpret_cast<HyperLogLog*>(src.ptr);
StringVal result(ctx, HLL_COLUMN_DEFAULT_LEN);
int size = src_hll->serialize((char*)result.ptr);
result.resize(ctx, size);
delete src_hll;
return result;
}

template void HllFunctions::hll_update(FunctionContext*, const BooleanVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const TinyIntVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const SmallIntVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const IntVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const BigIntVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const FloatVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const DoubleVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const StringVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const DateTimeVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const LargeIntVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const DecimalVal&, StringVal*);
template void HllFunctions::hll_update(FunctionContext*, const DecimalV2Val&, StringVal*);
}
44 changes: 44 additions & 0 deletions be/src/exprs/hll_function.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef DORIS_BE_SRC_QUERY_EXPRS_HLL_FUNCTION_H
#define DORIS_BE_SRC_QUERY_EXPRS_HLL_FUNCTION_H

#include "udf/udf.h"

namespace doris {

class HllFunctions {
public:
static void init();
static StringVal hll_hash(FunctionContext* ctx, const StringVal& dest_base);
static void hll_init(FunctionContext*, StringVal* dst);

template <typename T>
static void hll_update(FunctionContext*, const T& src, StringVal* dst);

static void hll_merge(FunctionContext*,const StringVal& src, StringVal* dst);

static BigIntVal hll_finalize(FunctionContext*, const StringVal& src);

static StringVal hll_serialize(FunctionContext* ctx, const StringVal& src);

static BigIntVal hll_cardinality(FunctionContext* ctx, const StringVal& src);
};
}

#endif
Loading