diff --git a/be/src/codegen/doris_ir.cpp b/be/src/codegen/doris_ir.cpp index e39492ccf4b1cb..4c9a369a5b8b3a 100644 --- a/be/src/codegen/doris_ir.cpp +++ b/be/src/codegen/doris_ir.cpp @@ -20,7 +20,6 @@ struct __float128; #include "codegen/codegen_anyval_ir.cpp" #include "exec/aggregation_node_ir.cpp" #include "exec/hash_join_node_ir.cpp" -#include "exprs/aggregate_functions.cpp" #include "exprs/cast_functions.cpp" #include "exprs/conditional_functions_ir.cpp" #include "exprs/decimal_operators.cpp" diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index b5ae0096f5ad2a..a8493b12076dfc 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -51,6 +51,7 @@ #include "exprs/json_functions.h" #include "exprs/hll_hash_function.h" #include "exprs/timezone_db.h" +#include "exprs/bitmap_function.h" #include "geo/geo_functions.h" #include "olap/options.h" #include "util/time.h" @@ -84,7 +85,7 @@ void* tcmalloc_gc_thread(void* dummy) { return NULL; } - + void* memory_maintenance_thread(void* dummy) { while (true) { sleep(config::memory_maintenance_sleep_time_s); @@ -94,20 +95,20 @@ void* memory_maintenance_thread(void* dummy) { if (env != nullptr) { BufferPool* buffer_pool = env->buffer_pool(); if (buffer_pool != nullptr) buffer_pool->Maintenance(); - + // The process limit as measured by our trackers may get out of sync with the // process usage if memory is allocated or freed without updating a MemTracker. // The metric is refreshed whenever memory is consumed or released via a MemTracker, // so on a system with queries executing it will be refreshed frequently. However // if the system is idle, we need to refresh the tracker occasionally since // untracked memory may be allocated or freed, e.g. by background threads. - if (env->process_mem_tracker() != nullptr && + if (env->process_mem_tracker() != nullptr && !env->process_mem_tracker()->is_consumption_metric_null()) { env->process_mem_tracker()->RefreshConsumptionFromMetric(); - } - } - } - + } + } + } + return NULL; } @@ -174,8 +175,8 @@ void* calculate_metrics(void* dummy) { } sleep(15); // 15 seconds - } - + } + return NULL; } @@ -270,6 +271,7 @@ void init_daemon(int argc, char** argv, const std::vector& paths) { ESFunctions::init(); GeoFunctions::init(); TimezoneDatabase::init(); + BitmapFunctions::init(); pthread_t tc_malloc_pid; pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 44b3007e440ddb..f8d0baaae55e49 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -252,6 +252,7 @@ Status OlapScanner::get_batch( state->batch_size() * _tuple_desc->byte_size()); bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); Tuple *tuple = reinterpret_cast(tuple_buf); + std::unique_ptr arena(new Arena()); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; { @@ -263,7 +264,7 @@ Status OlapScanner::get_batch( break; } // Read one row from reader - auto res = _reader->next_row_with_aggregation(&_read_row_cursor, eof); + auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof); if (res != OLAP_SUCCESS) { return Status::InternalError("Internal Error: read storage fail."); } @@ -328,6 +329,11 @@ Status OlapScanner::get_batch( slot->ptr = reinterpret_cast(v); } } + + // the memory allocate by arena has been copied, + // so we should release these memory immediately + arena.reset(new Arena()); + if (VLOG_ROW_IS_ON) { VLOG_ROW << "OlapScanner output row: " << Tuple::to_string(tuple, *_tuple_desc); } diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 964e4c85d74438..9bbdd022735e08 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -64,6 +64,7 @@ add_library(Exprs hll_hash_function.cpp agg_fn.cc new_agg_fn_evaluator.cc + bitmap_function.cpp ) #ADD_BE_TEST(json_function_test) #ADD_BE_TEST(binary_predicate_test) diff --git a/be/src/exprs/bitmap_function.cpp b/be/src/exprs/bitmap_function.cpp new file mode 100644 index 00000000000000..55ea1e2ca15e4b --- /dev/null +++ b/be/src/exprs/bitmap_function.cpp @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/bitmap_function.h" + +#include "exprs/anyval_util.h" +#include "util/bitmap.h" + +namespace doris { +void BitmapFunctions::init() { +} + +void BitmapFunctions::bitmap_init(FunctionContext* ctx, StringVal* dst) { + dst->is_null = false; + dst->len = sizeof(RoaringBitmap); + dst->ptr = (uint8_t*)new RoaringBitmap(); +} + +template +void BitmapFunctions::bitmap_update_int(FunctionContext* ctx, const T& src, StringVal* dst) { + if (src.is_null) { + return; + } + + auto* dst_bitmap = reinterpret_cast(dst->ptr); + dst_bitmap->update(src.val); +} + +BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal& src) { + auto* src_bitmap = reinterpret_cast(src.ptr); + BigIntVal result(src_bitmap->cardinality()); + delete src_bitmap; + return result; +} + +void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) { + RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr); + auto* dst_bitmap = reinterpret_cast(dst->ptr); + dst_bitmap->merge(src_bitmap); +} + +BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) { + RoaringBitmap bitmap ((char*)src.ptr); + BigIntVal result(bitmap.cardinality()); + return result; +} + +// we assume the input src is a valid integer string +StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) { + std::unique_ptr bitmap {new RoaringBitmap()}; + if (!src.is_null) { + std::string tmp_str = std::string(reinterpret_cast(src.ptr), src.len) ; + bitmap->update(std::stoi(tmp_str)); + } + std::string buf; + buf.resize(bitmap->size()); + bitmap->serialize((char*)buf.c_str()); + return AnyValUtil::from_string_temp(ctx, buf); +} + +StringVal BitmapFunctions::bitmap_serialize(FunctionContext* ctx, const StringVal& src) { + auto* src_bitmap = reinterpret_cast(src.ptr); + StringVal result(ctx, src_bitmap->size()); + src_bitmap->serialize((char*)result.ptr); + delete src_bitmap; + return result; +} + +template void BitmapFunctions::bitmap_update_int( + FunctionContext* ctx, const TinyIntVal& src, StringVal* dst); +template void BitmapFunctions::bitmap_update_int( + FunctionContext* ctx, const SmallIntVal& src, StringVal* dst); +template void BitmapFunctions::bitmap_update_int( + FunctionContext* ctx, const IntVal& src, StringVal* dst); + +} diff --git a/be/src/exprs/bitmap_function.h b/be/src/exprs/bitmap_function.h new file mode 100644 index 00000000000000..65b866df65ee28 --- /dev/null +++ b/be/src/exprs/bitmap_function.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H +#define DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H + +#include "udf/udf.h" + +namespace doris { + +class BitmapFunctions { +public: + static void init(); + static void bitmap_init(FunctionContext* ctx, StringVal* slot); + template + static void bitmap_update_int(FunctionContext* ctx, const T& src, StringVal* dst); + // the input src's ptr need to point a RoaringBitmap, this function will release the + // RoaringBitmap memory + static BigIntVal bitmap_finalize(FunctionContext* ctx, const StringVal& src); + + static void bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst); + static BigIntVal bitmap_count(FunctionContext* ctx, const StringVal& src); + + static StringVal bitmap_serialize(FunctionContext* ctx, const StringVal& src); + static StringVal to_bitmap(FunctionContext* ctx, const StringVal& src); +}; +} +#endif //DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H diff --git a/be/src/gutil/port.h b/be/src/gutil/port.h index e204de7bda5566..a09b1aa1eec666 100644 --- a/be/src/gutil/port.h +++ b/be/src/gutil/port.h @@ -645,10 +645,6 @@ inline void *aligned_malloc(size_t size, int minimum_alignment) { #endif } -inline void aligned_free(void *aligned_memory) { - free(aligned_memory); -} - #else // not GCC #define PRINTF_ATTRIBUTE(string_index, first_to_check) @@ -861,10 +857,6 @@ inline void *aligned_malloc(size_t size, int minimum_alignment) { return _aligned_malloc(size, minimum_alignment); } -inline void aligned_free(void *aligned_memory) { - _aligned_free(aligned_memory); -} - // ----- BEGIN VC++ STUBS & FAKE DEFINITIONS --------------------------------- // See http://en.wikipedia.org/wiki/IEEE_754 for details of diff --git a/be/src/olap/aggregate_func.cpp b/be/src/olap/aggregate_func.cpp index f217ad715d9500..a3e82697cfd411 100644 --- a/be/src/olap/aggregate_func.cpp +++ b/be/src/olap/aggregate_func.cpp @@ -23,12 +23,8 @@ template AggregateInfo::AggregateInfo(const Traits& traits) : _init_fn(traits.init), _update_fn(traits.update), - _merge_fn(traits.merge), _finalize_fn(traits.finalize), _agg_method(traits.agg_method) { - if (_merge_fn == nullptr) { - _merge_fn = _update_fn; - } } struct AggregateFuncMapHash { @@ -128,6 +124,9 @@ AggregateFuncResolver::AggregateFuncResolver() { // Hyperloglog Aggregate Function add_aggregate_mapping(); + + // Bitmap Aggregate Function + add_aggregate_mapping(); } AggregateFuncResolver::~AggregateFuncResolver() { diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index c552c05e295736..ca5b3b63171efc 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -20,48 +20,44 @@ #include "olap/hll.h" #include "olap/types.h" #include "olap/row_cursor_cell.h" +#include "runtime/datetime_value.h" +#include "runtime/decimalv2_value.h" +#include "runtime/string_value.h" #include "util/arena.h" +#include "util/bitmap.h" namespace doris { -using AggeInitFunc = void (*)(char* dst, Arena* arena); +using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena); using AggUpdateFunc = void (*)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena); -using AggFinalizeFunc = void (*)(char* data, Arena* arena); +using AggFinalizeFunc = void (*)(RowCursorCell* src, Arena* arena); // This class contains information about aggregate operation. class AggregateInfo { public: - // Init function will initialize aggregation execute environment in dst. - // For example: for sum, we just initial dst to 0. For HLL column, it will - // allocate and init context used to compute HLL. + // todo(kks): Unify this AggregateInfo::init method and Field::agg_init method + + // Init function will initialize aggregation execute environment in dst with source + // and convert the source raw data to storage aggregate format // // Memory Note: For plain memory can be allocated from arena, whose lifetime // will last util finalize function is called. Memory allocated from heap should // be freed in finalize functioin to avoid memory leak. - inline void init(void* dst, Arena* arena) const { - _init_fn((char*)dst, arena); + inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const { + _init_fn(dst, src, src_null, arena); } + // Update aggregated intermediate data. Data stored in engine is aggregated. // Actually do the aggregate operation. dst is the context which is initialized // by init function, src is the current value which is to be aggregated. // For example: For sum, dst is the current sum, and src is the next value which // will be added to sum. - // This function usually is used when load function. - // + // Memory Note: Same with init function. inline void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const { _update_fn(dst, src, arena); } - // Merge aggregated intermediate data. Data stored in engine is aggregated, - // because storage has done some aggregate when loading or compaction. - // So this function is often used in read operation. - // - // Memory Note: Same with init function. - inline void merge(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const { - _merge_fn(dst, src, arena); - } - // Finalize function convert intermediate context into final format. For example: // For HLL type, finalize function will serialize the aggregate context into a slice. // For input src points to the context, and when function is finished, result will be @@ -70,17 +66,16 @@ class AggregateInfo { // Memory Note: All heap memory allocated in init and update function should be freed // before this function return. Memory allocated from arena will be still available // and will be freed by client. - inline void finalize(void* src, Arena* arena) const { - _finalize_fn((char*)src, arena); + inline void finalize(RowCursorCell* src, Arena* arena) const { + _finalize_fn(src, arena); } FieldAggregationMethod agg_method() const { return _agg_method; } private: - void (*_init_fn)(char* dst, Arena* arena); - AggUpdateFunc _update_fn = nullptr; - AggUpdateFunc _merge_fn = nullptr; - void (*_finalize_fn)(char* dst, Arena* arena); + void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena); + void (*_update_fn)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena); + void (*_finalize_fn)(RowCursorCell* src, Arena* arena); friend class AggregateFuncResolver; @@ -90,33 +85,80 @@ class AggregateInfo { FieldAggregationMethod _agg_method; }; +template struct BaseAggregateFuncs { - // Default init function will set to null - static void init(char* dst, Arena* arena) { - *reinterpret_cast(dst) = true; + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + dst->set_is_null(src_null); + if (src_null) { + return; + } + + const TypeInfo* _type_info = get_type_info(field_type); + _type_info->deep_copy_with_arena(dst->mutable_cell_ptr(), src, arena); } // Default update do nothing. static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { } - // For most aggregate method, its merge and update are same. If merge - // is same with update, keep merge nullptr to avoid duplicate code. - // AggregateInfo constructor will set merge function to update function - // when merge is nullptr. - AggUpdateFunc merge = nullptr; - // Default finalize do nothing. - static void finalize(char* src, Arena* arena) { + static void finalize(RowCursorCell* src, Arena* arena) { } }; template -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : public BaseAggregateFuncs { +}; + +template <> +struct AggregateFuncTraits : + public BaseAggregateFuncs { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + dst->set_is_null(src_null); + if (src_null) { + return; + } + + auto* decimal_value = reinterpret_cast(src); + auto* storage_decimal_value = reinterpret_cast(dst->mutable_cell_ptr()); + storage_decimal_value->integer = decimal_value->int_value(); + storage_decimal_value->fraction = decimal_value->frac_value(); + } +}; + +template <> +struct AggregateFuncTraits : + public BaseAggregateFuncs { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + dst->set_is_null(src_null); + if (src_null) { + return; + } + + auto* datetime_value = reinterpret_cast(src); + auto* storage_datetime_value = reinterpret_cast(dst->mutable_cell_ptr()); + *storage_datetime_value = datetime_value->to_olap_datetime(); + } +}; + +template <> +struct AggregateFuncTraits : + public BaseAggregateFuncs { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + dst->set_is_null(src_null); + if (src_null) { + return; + } + + auto* date_value = reinterpret_cast(src); + auto* storage_date_value = reinterpret_cast(dst->mutable_cell_ptr()); + *storage_date_value = static_cast(date_value->to_olap_date()); + } }; template -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public AggregateFuncTraits { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -135,7 +177,8 @@ struct AggregateFuncTraits : public Base }; template <> -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public BaseAggregateFuncs { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -161,7 +204,8 @@ struct AggregateFuncTraits }; template -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public AggregateFuncTraits { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -180,7 +224,8 @@ struct AggregateFuncTraits : public Base }; template <> -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public BaseAggregateFuncs { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -200,7 +245,8 @@ struct AggregateFuncTraits }; template -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public AggregateFuncTraits { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -221,7 +267,8 @@ struct AggregateFuncTraits : public Base }; template <> -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public BaseAggregateFuncs { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -245,7 +292,8 @@ struct AggregateFuncTraits }; template -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public AggregateFuncTraits { typedef typename FieldTypeTraits::CppType CppType; static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -258,7 +306,8 @@ struct AggregateFuncTraits : public }; template <> -struct AggregateFuncTraits : public BaseAggregateFuncs { +struct AggregateFuncTraits : + public AggregateFuncTraits { static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { bool dst_null = dst->is_null(); bool src_null = src.is_null(); @@ -281,29 +330,40 @@ struct AggregateFuncTraits }; template <> -struct AggregateFuncTraits - : public AggregateFuncTraits { +struct AggregateFuncTraits + : public AggregateFuncTraits { }; +// when data load, after hll_hash fucntion, hll_union column won't be null +// so when init, update hll, the src is not null template <> -struct AggregateFuncTraits : public BaseAggregateFuncs { - static void init(char* dst, Arena* arena) { +struct AggregateFuncTraits { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { // TODO(zc): refactor HLL implementation - *reinterpret_cast(dst) = false; - Slice* slice = reinterpret_cast(dst + 1); - HllContext* context = *reinterpret_cast(slice->data - sizeof(HllContext*)); + DCHECK_EQ(src_null, false); + dst->set_not_null(); + auto* dest_slice = reinterpret_cast(dst->mutable_cell_ptr()); + char* mem = arena->Allocate(sizeof(HllContext)); + auto* context = new (mem) HllContext; HllSetHelper::init_context(context); + HllSetHelper::fill_set(src, context); context->has_value = true; + char* variable_ptr = arena->Allocate(sizeof(HllContext*) + HLL_COLUMN_DEFAULT_LEN); + *(size_t*)(variable_ptr) = (size_t)(context); + variable_ptr += sizeof(HllContext*); + dest_slice->data = variable_ptr; + dest_slice->size = HLL_COLUMN_DEFAULT_LEN; } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { + DCHECK_EQ(src.is_null(), false); auto l_slice = reinterpret_cast(dst->mutable_cell_ptr()); auto context = *reinterpret_cast(l_slice->data - sizeof(HllContext*)); HllSetHelper::fill_set((const char*)src.cell_ptr(), context); } - static void finalize(char* data, Arena* arena) { - auto slice = reinterpret_cast(data); + static void finalize(RowCursorCell* src, Arena* arena) { + auto slice = reinterpret_cast(src->mutable_cell_ptr()); auto context = *reinterpret_cast(slice->data - sizeof(HllContext*)); std::map index_to_value; if (context->has_sparse_or_full || @@ -339,6 +399,50 @@ struct AggregateFuncTraitshash64_set; } }; +// when data load, after bitmap_init fucntion, bitmap_union column won't be null +// so when init, update bitmap, the src is not null +template <> +struct AggregateFuncTraits { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + DCHECK_EQ(src_null, false); + dst->set_not_null(); + auto* src_slice = reinterpret_cast(src); + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + + dst_slice->size = sizeof(RoaringBitmap); + dst_slice->data = (char*)new RoaringBitmap(src_slice->data); + } + + static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { + DCHECK_EQ(src.is_null(), false); + + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + auto* src_slice = reinterpret_cast(src.cell_ptr()); + auto* dst_bitmap = reinterpret_cast(dst_slice->data); + + // fixme(kks): trick here, need improve + if (arena == nullptr) { // for query + RoaringBitmap src_bitmap = RoaringBitmap(src_slice->data); + dst_bitmap->merge(src_bitmap); + } else { // for stream load + auto* src_bitmap = reinterpret_cast(src_slice->data); + dst_bitmap->merge(*src_bitmap); + + delete src_bitmap; + } + } + + static void finalize(RowCursorCell* src, Arena *arena) { + auto *slice = reinterpret_cast(src->mutable_cell_ptr()); + auto *bitmap = reinterpret_cast(slice->data); + + slice->size = bitmap->size(); + slice->data = arena->Allocate(slice->size); + bitmap->serialize(slice->data); + + delete bitmap; + } +}; template struct AggregateTraits : public AggregateFuncTraits { diff --git a/be/src/olap/field.h b/be/src/olap/field.h index ccc5cbdae59d93..32ee098a1e418d 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -40,45 +40,16 @@ namespace doris { // User can use this class to access or deal with column data in memory. class Field { public: - static Field* create(const TabletColumn& column) { - return new Field(column); - } - - static Field* create_by_type(const FieldType& type) { - // create by type - return new Field(type); - } - - Field(const TabletColumn& column) + explicit Field(const TabletColumn& column) : _type_info(get_type_info(column.type())), - _agg_info(get_aggregate_info(column.aggregation(), column.type())), _key_coder(get_key_coder(column.type())), _index_size(column.index_length()), - _is_nullable(column.is_nullable()) { } - - Field(FieldType type) - : _type_info(get_type_info(type)), - _agg_info(get_aggregate_info(OLAP_FIELD_AGGREGATION_NONE, type)), - _key_coder(get_key_coder(type)), - _index_size(_type_info->size()), - _is_nullable(true) { + _is_nullable(column.is_nullable()), + _agg_info(get_aggregate_info(column.aggregation(), column.type())), + _length(column.length()) { } - Field(const FieldAggregationMethod& agg, const FieldType& type, bool is_nullable) - : _type_info(get_type_info(type)), - _agg_info(get_aggregate_info(agg, type)), - _key_coder(get_key_coder(type)), - _index_size(-1), - _is_nullable(is_nullable) { - } - - Field(const FieldAggregationMethod& agg, const FieldType& type, size_t index_size, bool is_nullable) - : _type_info(get_type_info(type)), - _agg_info(get_aggregate_info(agg, type)), - _key_coder(get_key_coder(type)), - _index_size(index_size), - _is_nullable(is_nullable) { - } + virtual ~Field() = default; inline size_t size() const { return _type_info->size(); } inline size_t field_size() const { return size() + 1; } @@ -91,8 +62,34 @@ class Field { _agg_info->update(dest, src, arena); } - void agg_finalize(RowCursorCell* dst, Arena* arena = nullptr) const { - _agg_info->finalize(dst->mutable_cell_ptr(), arena); + inline void agg_finalize(RowCursorCell* dst, Arena* arena) const { + _agg_info->finalize(dst, arena); + } + + virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const { + _agg_info->init(dst, src, src_null, arena); + } + + // todo(kks): Unify AggregateInfo::init method and Field::agg_init method + + // This function will initialize destination with source. + // This functionn differs copy functionn in that if this filed + // contain aggregate information, this functionn will initialize + // destination in aggregate format, and update with srouce content. + virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const { + direct_copy(dst, src); + } + + virtual char* allocate_memory(char* cell_ptr, char* variable_ptr) const { + return variable_ptr; + } + + virtual size_t get_variable_len() const { + return 0; + } + + virtual Field* clone() const { + return new Field(*this); } // Test if these two cell is equal with each other @@ -186,13 +183,6 @@ class Field { _type_info->deep_copy_with_arena(dst->mutable_cell_ptr(), src.cell_ptr(), arena); } - // This function will initialize destination with source. - // This functionn differs copy functionn in that if this filed - // contain aggregate information, this functionn will initialize - // destination in aggregate format, and update with srouce content. - template - void agg_init(DstCellType* dst, const SrcCellType& src) const; - // deep copy filed content from `src` to `dst` without null-byte inline void deep_copy_content(char* dst, const char* src, MemPool* mem_pool) const { _type_info->deep_copy(dst, src, mem_pool); @@ -241,6 +231,7 @@ class Field { uint32_t hash_code(const CellType& cell, uint32_t seed) const; FieldType type() const { return _type_info->type(); } + FieldAggregationMethod aggregation() const { return _agg_info->agg_method();} const TypeInfo* type_info() const { return _type_info; } bool is_nullable() const { return _is_nullable; } @@ -254,10 +245,15 @@ class Field { private: // Field的最大长度,单位为字节,通常等于length, 变长字符串不同 const TypeInfo* _type_info; - const AggregateInfo* _agg_info; const KeyCoder* _key_coder; uint16_t _index_size; bool _is_nullable; + +protected: + const AggregateInfo* _agg_info; + // 长度,单位为字节 + // 除字符串外,其它类型都是确定的 + uint32_t _length; }; template @@ -307,32 +303,6 @@ int Field::index_cmp(const LhsCellType& lhs, const RhsCellType& rhs) const { return res; } -template -void Field::agg_init(DstCellType* dst, const SrcCellType& src) const { - // TODO(zc): This function is also used to initialize key columns. - // So, refactor this in later PR - if (OLAP_LIKELY(type() != OLAP_FIELD_TYPE_HLL)) { - direct_copy(dst, src); - } else { - bool is_null = src.is_null(); - // TODO(zc): If source is null, can we set this to null? - // I'm not sure, just set like old code - dst->set_is_null(is_null); - - Slice* slice = reinterpret_cast(dst->mutable_cell_ptr()); - size_t hll_ptr = *(size_t*)(slice->data - sizeof(HllContext*)); - HllContext* context = (reinterpret_cast(hll_ptr)); - HllSetHelper::init_context(context); -#if 0 - if (is_null) { - return; - } -#endif - HllSetHelper::fill_set((const char*)src.cell_ptr(), context); - context->has_value = true; - } -} - template void Field::to_index(DstCellType* dst, const SrcCellType& src) const { bool is_null = src.is_null(); @@ -368,6 +338,155 @@ uint32_t Field::hash_code(const CellType& cell, uint32_t seed) const { return _type_info->hash_code(cell.cell_ptr(), seed); } +class CharField: public Field { +public: + explicit CharField(const TabletColumn& column) : Field(column) { + } + + // the char field is especial, which need the _length info when consume raw data + void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const override { + dst->set_is_null(src_null); + if (src_null) { + return; + } + + auto* value = reinterpret_cast(src); + auto* dest_slice = (Slice*)(dst->mutable_cell_ptr()); + dest_slice->size = _length; + dest_slice->data = arena->Allocate(dest_slice->size); + memcpy(dest_slice->data, value->ptr, value->len); + memset(dest_slice->data + value->len, 0, dest_slice->size - value->len); + } + + size_t get_variable_len() const override { + return _length; + } + + char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { + auto slice = (Slice*)cell_ptr; + slice->data = variable_ptr; + slice->size = _length; + variable_ptr += slice->size; + return variable_ptr; + } + + CharField* clone() const override { + return new CharField(*this); + } +}; + +class VarcharField: public Field { +public: + explicit VarcharField(const TabletColumn& column) : Field(column) { + } + + size_t get_variable_len() const override { + return _length - OLAP_STRING_MAX_BYTES; + } + + char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { + auto slice = (Slice*)cell_ptr; + slice->data = variable_ptr; + slice->size = _length - OLAP_STRING_MAX_BYTES; + variable_ptr += slice->size; + return variable_ptr; + } + + VarcharField* clone() const override { + return new VarcharField(*this); + } +}; + +class BitmapAggField: public Field { +public: + explicit BitmapAggField(const TabletColumn& column) : Field(column) { + } + + // bitmap storage data always not null + void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override { + _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena); + } + + char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { + auto slice = (Slice*)cell_ptr; + slice->data = nullptr; + return variable_ptr; + } + + BitmapAggField* clone() const override { + return new BitmapAggField(*this); + } +}; + +class HllAggField: public Field { +public: + explicit HllAggField(const TabletColumn& column) : Field(column) { + } + + // Hll storage data always not null + void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override { + _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena); + } + + char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { + auto slice = (Slice*)cell_ptr; + slice->data = nullptr; + return variable_ptr; + } + + HllAggField* clone() const override { + return new HllAggField(*this); + } +}; + +class FieldFactory { +public: + static Field* create(const TabletColumn& column) { + // for key column + if (column.is_key()) { + switch (column.type()) { + case OLAP_FIELD_TYPE_CHAR: + return new CharField(column); + case OLAP_FIELD_TYPE_VARCHAR: + return new VarcharField(column); + default: + return new Field(column); + } + } + + // for value column + switch (column.aggregation()) { + case OLAP_FIELD_AGGREGATION_NONE: + case OLAP_FIELD_AGGREGATION_SUM: + case OLAP_FIELD_AGGREGATION_MIN: + case OLAP_FIELD_AGGREGATION_MAX: + case OLAP_FIELD_AGGREGATION_REPLACE: + switch (column.type()) { + case OLAP_FIELD_TYPE_CHAR: + return new CharField(column); + case OLAP_FIELD_TYPE_VARCHAR: + return new VarcharField(column); + default: + return new Field(column); + } + case OLAP_FIELD_AGGREGATION_HLL_UNION: + return new HllAggField(column); + case OLAP_FIELD_AGGREGATION_BITMAP_UNION: + return new BitmapAggField(column); + case OLAP_FIELD_AGGREGATION_UNKNOWN: + LOG(WARNING) << "WOW! value column agg type is unknown"; + return nullptr; + } + LOG(WARNING) << "WOW! value column no agg type"; + return nullptr; + } + + static Field* create_by_type(const FieldType& type) { + TabletColumn column(OLAP_FIELD_AGGREGATION_NONE, type); + return create(column); + } +}; + } // namespace doris #endif // DORIS_BE_SRC_OLAP_FIELD_H diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 33085fe53c3817..baed5da6b046e2 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -63,84 +63,20 @@ void MemTable::insert(Tuple* tuple) { for (size_t i = 0; i < _col_ids->size(); ++i) { auto cell = row.cell(i); const SlotDescriptor* slot = slots[(*_col_ids)[i]]; - - if (tuple->is_null(slot->null_indicator_offset())) { - cell.set_null(); - continue; - } - cell.set_not_null(); - TypeDescriptor type = slot->type(); - switch (type.type) { - case TYPE_CHAR: { - const StringValue* src = tuple->get_string_slot(slot->tuple_offset()); - Slice* dest = (Slice*)(cell.cell_ptr()); - dest->size = _tablet_schema->column(i).length(); - dest->data = _arena.Allocate(dest->size); - memcpy(dest->data, src->ptr, src->len); - memset(dest->data + src->len, 0, dest->size - src->len); - break; - } - case TYPE_VARCHAR: { - const StringValue* src = tuple->get_string_slot(slot->tuple_offset()); - Slice* dest = (Slice*)(cell.cell_ptr()); - dest->size = src->len; - dest->data = _arena.Allocate(dest->size); - memcpy(dest->data, src->ptr, dest->size); - break; - } - case TYPE_HLL: { - const StringValue* src = tuple->get_string_slot(slot->tuple_offset()); - Slice* dest = (Slice*)(cell.cell_ptr()); - dest->size = src->len; - bool exist = _skip_list->Contains(_tuple_buf); - if (exist) { - dest->data = _arena.Allocate(dest->size); - memcpy(dest->data, src->ptr, dest->size); - } else { - dest->data = src->ptr; - char* mem = _arena.Allocate(sizeof(HllContext)); - HllContext* context = new (mem) HllContext; - HllSetHelper::init_context(context); - HllSetHelper::fill_set(reinterpret_cast(dest), context); - context->has_value = true; - char* variable_ptr = _arena.Allocate(sizeof(HllContext*) + HLL_COLUMN_DEFAULT_LEN); - *(size_t*)(variable_ptr) = (size_t)(context); - variable_ptr += sizeof(HllContext*); - dest->data = variable_ptr; - dest->size = HLL_COLUMN_DEFAULT_LEN; - } - break; - } - case TYPE_DECIMAL: { - DecimalValue* decimal_value = tuple->get_decimal_slot(slot->tuple_offset()); - decimal12_t* storage_decimal_value = reinterpret_cast(cell.mutable_cell_ptr()); - storage_decimal_value->integer = decimal_value->int_value(); - storage_decimal_value->fraction = decimal_value->frac_value(); - break; - } - case TYPE_DECIMALV2: { - DecimalV2Value* decimal_value = tuple->get_decimalv2_slot(slot->tuple_offset()); - decimal12_t* storage_decimal_value = reinterpret_cast(cell.mutable_cell_ptr()); - storage_decimal_value->integer = decimal_value->int_value(); - storage_decimal_value->fraction = decimal_value->frac_value(); - break; - } - case TYPE_DATETIME: { - DateTimeValue* datetime_value = tuple->get_datetime_slot(slot->tuple_offset()); - uint64_t* storage_datetime_value = reinterpret_cast(cell.mutable_cell_ptr()); - *storage_datetime_value = datetime_value->to_olap_datetime(); - break; - } - case TYPE_DATE: { - DateTimeValue* date_value = tuple->get_datetime_slot(slot->tuple_offset()); - uint24_t* storage_date_value = reinterpret_cast(cell.mutable_cell_ptr()); - *storage_date_value = static_cast(date_value->to_olap_date()); - break; - } - default: { - memcpy(cell.mutable_cell_ptr(), tuple->get_slot(slot->tuple_offset()), _schema->column_size(i)); - break; - } + + // todo(kks): currently, HLL implementation don't have a merge method + // we should refactor HLL implementation and remove this special case handle + if (slot->type() == TYPE_HLL && _skip_list->Contains(_tuple_buf)) { + cell.set_not_null(); + const StringValue* src = tuple->get_string_slot(slot->tuple_offset()); + auto* dest = (Slice*)(cell.cell_ptr()); + dest->size = src->len; + dest->data = _arena.Allocate(dest->size); + memcpy(dest->data, src->ptr, dest->size); + } else { + bool is_null = tuple->is_null(slot->null_indicator_offset()); + void* value = tuple->get_slot(slot->tuple_offset()); + _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena()); } } diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index be5c7225578a79..61a3fc6d5e1dd4 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -81,12 +81,13 @@ OLAPStatus Merger::merge() { has_error = true; } + std::unique_ptr arena(new Arena()); bool eof = false; row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); // The following procedure would last for long time, half of one day, etc. while (!has_error) { // Read one row into row_cursor - OLAPStatus res = reader.next_row_with_aggregation(&row_cursor, &eof); + OLAPStatus res = reader.next_row_with_aggregation(&row_cursor, arena.get(), &eof); if (OLAP_SUCCESS == res && eof) { VLOG(3) << "reader read to the end."; break; @@ -102,6 +103,10 @@ OLAPStatus Merger::merge() { break; } + // the memory allocate by arena has been copied, + // so we should release these memory immediately + arena.reset(new Arena()); + // Goto next row position in the row block being written ++_row_count; } diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index ecc1fdfbcac5a8..da815a3cc3e699 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -148,7 +148,8 @@ enum FieldAggregationMethod { OLAP_FIELD_AGGREGATION_MAX = 3, OLAP_FIELD_AGGREGATION_REPLACE = 4, OLAP_FIELD_AGGREGATION_HLL_UNION = 5, - OLAP_FIELD_AGGREGATION_UNKNOWN = 6 + OLAP_FIELD_AGGREGATION_UNKNOWN = 6, + OLAP_FIELD_AGGREGATION_BITMAP_UNION = 7 }; // 压缩算法类型 diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index d3bd0a5378d8ec..5553d5178d3791 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -337,7 +337,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return OLAP_SUCCESS; } -OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, bool* eof) { +OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; @@ -352,12 +352,12 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, bool* eof) { return OLAP_SUCCESS; } -OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, bool* eof) { +OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; } - init_row_with_others(row_cursor, *_next_key); + init_row_with_others(row_cursor, *_next_key, arena); int64_t merged_count = 0; do { auto res = _collect_iter->next(&_next_key, &_next_delete_flag); @@ -381,11 +381,11 @@ OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, bool* eof) { ++merged_count; } while (true); _merged_rows += merged_count; - agg_finalize_row(_value_cids, row_cursor); + agg_finalize_row(_value_cids, row_cursor, arena); return OLAP_SUCCESS; } -OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, bool* eof) { +OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { *eof = false; bool cur_delete_flag = false; do { @@ -395,7 +395,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, bool* eof) { } cur_delete_flag = _next_delete_flag; - init_row_with_others(row_cursor, *_next_key); + init_row_with_others(row_cursor, *_next_key, arena); int64_t merged_count = 0; while (NULL != _next_key) { @@ -411,12 +411,12 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, bool* eof) { // 1. DUP_KEYS keys type has no semantic to aggregate, // 2. to make cost of each scan round reasonable, we will control merged_count. if (_aggregation && merged_count > config::doris_scanner_row_num) { - agg_finalize_row(_value_cids, row_cursor); + agg_finalize_row(_value_cids, row_cursor, arena); break; } // break while can NOT doing aggregation if (!equal_row(_key_cids, *row_cursor, *_next_key)) { - agg_finalize_row(_value_cids, row_cursor); + agg_finalize_row(_value_cids, row_cursor, arena); break; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 2ae14ccfef2295..5698b71d307f53 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -121,8 +121,8 @@ class Reader { void close(); // Reader next row with aggregation. - OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, bool *eof) { - return (this->*_next_row_func)(row_cursor, eof); + OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, bool *eof) { + return (this->*_next_row_func)(row_cursor, arena, eof); } uint64_t merged_rows() const { @@ -196,9 +196,9 @@ class Reader { OLAPStatus _init_load_bf_columns(const ReaderParams& read_params); - OLAPStatus _dup_key_next_row(RowCursor* row_cursor, bool* eof); - OLAPStatus _agg_key_next_row(RowCursor* row_cursor, bool* eof); - OLAPStatus _unique_key_next_row(RowCursor* row_cursor, bool* eof); + OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); + OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); + OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); TabletSharedPtr tablet() { return _tablet; } @@ -229,7 +229,7 @@ class Reader { DeleteHandler _delete_handler; - OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, bool* eof) = nullptr; + OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, bool* eof) = nullptr; bool _aggregation; bool _version_locked; diff --git a/be/src/olap/row.h b/be/src/olap/row.h index 3d4267a808c910..4f06403f9034b1 100644 --- a/be/src/olap/row.h +++ b/be/src/olap/row.h @@ -72,10 +72,10 @@ int compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { // function will first initialize destination column and then update with source column // value. template -void init_row_with_others(DstRowType* dst, const SrcRowType& src) { +void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); - dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid)); + dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena); } } @@ -139,10 +139,10 @@ void agg_finalize_row(RowType* row, Arena* arena) { } template -void agg_finalize_row(const std::vector& ids, RowType* row) { +void agg_finalize_row(const std::vector& ids, RowType* row, Arena* arena) { for (uint32_t id : ids) { auto cell = row->cell(id); - row->schema()->column(id)->agg_finalize(&cell); + row->schema()->column(id)->agg_finalize(&cell, arena); } } diff --git a/be/src/olap/row_cursor.cpp b/be/src/olap/row_cursor.cpp index 8c1e55bd7788f2..08027e0d307058 100644 --- a/be/src/olap/row_cursor.cpp +++ b/be/src/olap/row_cursor.cpp @@ -32,9 +32,6 @@ RowCursor::RowCursor() : RowCursor::~RowCursor() { delete [] _owned_fixed_buf; - for (HllContext* context : hll_contexts) { - delete context; - } delete [] _variable_buf; } @@ -44,17 +41,14 @@ OLAPStatus RowCursor::_init(const std::vector& schema, _fixed_len = _schema->schema_size(); _variable_len = 0; for (auto cid : columns) { - const TabletColumn& column = schema[cid]; - FieldType type = column.type(); - if (type == OLAP_FIELD_TYPE_VARCHAR) { - _variable_len += column.length() - OLAP_STRING_MAX_BYTES; - } else if (type == OLAP_FIELD_TYPE_CHAR) { - _variable_len += column.length(); - } else if (type == OLAP_FIELD_TYPE_HLL) { - _variable_len += HLL_COLUMN_DEFAULT_LEN + sizeof(HllContext*); + if (_schema->column(cid) == nullptr) { + LOG(WARNING) << "Fail to create field."; + return OLAP_ERR_INIT_FAILED; } + _variable_len += column_schema(cid)->get_variable_len(); } + _fixed_len = _schema->schema_size(); _fixed_buf = new (nothrow) char[_fixed_len]; if (_fixed_buf == nullptr) { LOG(WARNING) << "Fail to malloc _fixed_buf."; @@ -184,31 +178,8 @@ OLAPStatus RowCursor::allocate_memory_for_string_type(const TabletSchema& schema char* fixed_ptr = _fixed_buf; char* variable_ptr = _variable_buf; for (auto cid : _schema->column_ids()) { - const TabletColumn& column = schema.column(cid); fixed_ptr = _fixed_buf + _schema->column_offset(cid); - FieldType type = column.type(); - if (type == OLAP_FIELD_TYPE_VARCHAR) { - Slice* slice = reinterpret_cast(fixed_ptr + 1); - slice->data = variable_ptr; - slice->size = column.length() - OLAP_STRING_MAX_BYTES; - variable_ptr += slice->size; - } else if (type == OLAP_FIELD_TYPE_CHAR) { - Slice* slice = reinterpret_cast(fixed_ptr + 1); - slice->data = variable_ptr; - slice->size = column.length(); - variable_ptr += slice->size; - } else if (type == OLAP_FIELD_TYPE_HLL) { - // slice.data points to serialized HLL, (slice.data - 8) points to HllContext object used to aggregate HLL - auto slice = reinterpret_cast(fixed_ptr + 1); - HllContext* context = new HllContext(); - hll_contexts.push_back(context); - - *(size_t*)(variable_ptr) = (size_t)(context); - variable_ptr += sizeof(HllContext*); - slice->data = variable_ptr; // serialized HLL will be populated when finalizing HllContext - slice->size = HLL_COLUMN_DEFAULT_LEN; - variable_ptr += slice->size; - } + variable_ptr = column_schema(cid)->allocate_memory(fixed_ptr + 1, variable_ptr); } return OLAP_SUCCESS; } diff --git a/be/src/olap/row_cursor.h b/be/src/olap/row_cursor.h index 3d4dce4e7b4adb..943dbbfda0e0e8 100644 --- a/be/src/olap/row_cursor.h +++ b/be/src/olap/row_cursor.h @@ -152,6 +152,7 @@ class RowCursor { const Schema* schema() const { return _schema.get(); } char* row_ptr() const { return _fixed_buf; } + private: // common init function OLAPStatus _init(const std::vector& schema, @@ -165,11 +166,9 @@ class RowCursor { char* _variable_buf = nullptr; size_t _variable_len; - std::vector hll_contexts; DISALLOW_COPY_AND_ASSIGN(RowCursor); }; - } // namespace doris #endif // DORIS_BE_SRC_OLAP_ROW_CURSOR_H diff --git a/be/src/olap/rowset/segment_v2/column_zone_map.cpp b/be/src/olap/rowset/segment_v2/column_zone_map.cpp index 50e901aa6404cd..909a9cfa4c2ba6 100644 --- a/be/src/olap/rowset/segment_v2/column_zone_map.cpp +++ b/be/src/olap/rowset/segment_v2/column_zone_map.cpp @@ -27,7 +27,7 @@ ColumnZoneMapBuilder::ColumnZoneMapBuilder(const TypeInfo* type_info) : _type_in PageBuilderOptions options; options.data_page_size = 0; _page_builder.reset(new BinaryPlainPageBuilder(options)); - _field.reset(Field::create_by_type(_type_info->type())); + _field.reset(FieldFactory::create_by_type(_type_info->type())); _max_string_value = _arena.Allocate(OLAP_STRING_MAX_LENGTH); _zone_map.min_value = _arena.Allocate(_type_info->size()); _zone_map.max_value = _arena.Allocate(_type_info->size()); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index fe1bcbe6c07d25..4f866003d8ab34 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -101,18 +101,18 @@ Status SegmentIterator::_init_short_key_range() { // Set up environment for the following seek. Status SegmentIterator::_prepare_seek() { - std::vector key_fields; + std::vector key_fields; std::set column_set; if (_opts.lower_bound != nullptr) { for (auto cid : _opts.lower_bound->schema()->column_ids()) { column_set.emplace(cid); - key_fields.push_back(*_opts.lower_bound->schema()->column(cid)); + key_fields.emplace_back(_opts.lower_bound->schema()->column(cid)); } } if (_opts.upper_bound != nullptr) { for (auto cid : _opts.upper_bound->schema()->column_ids()) { if (column_set.count(cid) == 0) { - key_fields.push_back(*_opts.upper_bound->schema()->column(cid)); + key_fields.emplace_back(_opts.upper_bound->schema()->column(cid)); column_set.emplace(cid); } } diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp index 2859c752ca03c9..5e02b864c6ae1b 100644 --- a/be/src/olap/schema.cpp +++ b/be/src/olap/schema.cpp @@ -37,44 +37,56 @@ void Schema::copy_from(const Schema& other) { _col_ids = other._col_ids; _col_offsets = other._col_offsets; _cols.resize(other._cols.size(), nullptr); - for (auto cid : _col_ids) { - _cols[cid] = new Field(*other._cols[cid]); + _cols[cid] = other._cols[cid]->clone(); } } +void Schema::init_field(const std::vector& columns, + const std::vector& col_ids) { + _cols.resize(columns.size(), nullptr); + // we must make sure that the offset is the same with RowBlock's + std::unordered_set column_set(col_ids.begin(), col_ids.end()); + for (int cid = 0; cid < columns.size(); ++cid) { + if (column_set.find(cid) == column_set.end()) { + continue; + } + _cols[cid] = FieldFactory::create(columns[cid]); + } +} -void Schema::reset(const std::vector& cols, size_t num_key_columns) { - std::vector col_ids(cols.size()); - for (uint32_t cid = 0; cid < cols.size(); ++cid) { - col_ids[cid] = cid; +void Schema::init_field(const std::vector& cols, + const std::vector& col_ids) { + _cols.resize(cols.size(), nullptr); + // we must make sure that the offset is the same with RowBlock's + std::unordered_set column_set(col_ids.begin(), col_ids.end()); + for (int cid = 0; cid < cols.size(); ++cid) { + if (column_set.find(cid) == column_set.end()) { + continue; + } + _cols[cid] = cols[cid]->clone(); } - reset(cols, col_ids, num_key_columns); } -void Schema::reset(const std::vector& cols, - const std::vector& col_ids, +void Schema::init(const std::vector& col_ids, size_t num_key_columns) { _num_key_columns = num_key_columns; - _col_ids = col_ids; - _cols.resize(cols.size(), nullptr); - _col_offsets.resize(cols.size(), -1); + _col_offsets.resize(_cols.size(), -1); // we must make sure that the offset is the same with RowBlock's std::unordered_set column_set(_col_ids.begin(), _col_ids.end()); size_t offset = 0; - for (int cid = 0; cid < cols.size(); ++cid) { + for (int cid = 0; cid < _cols.size(); ++cid) { if (column_set.find(cid) == column_set.end()) { continue; } - _cols[cid] = new Field(cols[cid]); _col_offsets[cid] = offset; - // 1 for null byte offset += _cols[cid]->size() + 1; } + _schema_size = offset; } Schema::~Schema() { diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index a2d208d4217b10..fe72c6d745b886 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -20,6 +20,7 @@ #include #include "olap/aggregate_func.h" +#include "olap/field.h" #include "olap/tablet_schema.h" #include "olap/types.h" #include "olap/field.h" @@ -28,6 +29,7 @@ namespace doris { + class RowBlockRow; // The class is used to represent row's format in memory. @@ -40,35 +42,53 @@ class RowBlockRow; class Schema { public: Schema(const TabletSchema& schema) { - std::vector cols; size_t num_key_columns = 0; - for (int i = 0; i < schema.num_columns(); ++i) { - const TabletColumn& column = schema.column(i); - cols.emplace_back(column.aggregation(), column.type(), column.index_length(), column.is_nullable()); + std::vector col_ids(schema.num_columns()); + std::vector columns(schema.num_columns()); + + for (uint32_t cid = 0; cid < schema.num_columns(); ++cid) { + col_ids[cid] = cid; + const TabletColumn& column = schema.column(cid); + columns[cid] = column; if (column.is_key()) { num_key_columns++; } } - reset(cols, num_key_columns); + init_field(columns, col_ids); + init(col_ids, num_key_columns); } Schema(const std::vector& columns, const std::vector& col_ids) { - std::vector cols; size_t num_key_columns = 0; - for (int i = 0; i < columns.size(); ++i) { - const TabletColumn& column = columns[i]; - cols.emplace_back(column.aggregation(), column.type(), column.index_length(), column.is_nullable()); - if (column.is_key()) { + for (const auto& i: columns) { + if (i.is_key()) { num_key_columns++; } } - reset(cols, col_ids, num_key_columns); + init_field(columns, col_ids); + init(col_ids, num_key_columns); + } + + Schema(const std::vector& columns, size_t num_key_columns) { + std::vector col_ids(columns.size()); + for (uint32_t cid = 0; cid < columns.size(); ++cid) { + col_ids[cid] = cid; + } + + init_field(columns, col_ids); + init(col_ids, num_key_columns); } - Schema(const std::vector& cols, size_t num_key_columns) { - reset(cols, num_key_columns); + Schema(const std::vector& cols, size_t num_key_columns) { + std::vector col_ids(cols.size()); + for (uint32_t cid = 0; cid < cols.size(); ++cid) { + col_ids[cid] = cid; + } + + init_field(cols, col_ids); + init(col_ids, num_key_columns); } Schema(const Schema&); @@ -78,16 +98,12 @@ class Schema { ~Schema(); - void reset(const std::vector& cols, size_t num_key_columns); - - void reset(const std::vector& cols, - const std::vector& col_ids, - size_t num_key_columns); - const std::vector& columns() const { return _cols; } const Field* column(int idx) const { return _cols[idx]; } - size_t num_key_columns() const { return _num_key_columns; } + size_t num_key_columns() const { + return _num_key_columns; + } size_t column_offset(ColumnId cid) const { return _col_offsets[cid]; @@ -110,11 +126,7 @@ class Schema { } size_t schema_size() const { - size_t size = _col_ids.size(); - for (auto cid : _col_ids) { - size += _cols[cid]->size(); - } - return size; + return _schema_size; } size_t num_columns() const { return _cols.size(); } @@ -125,6 +137,19 @@ class Schema { std::vector _col_ids; std::vector _col_offsets; size_t _num_key_columns; + size_t _schema_size; + + // init _cols member variable, must call before init method + void init_field(const std::vector& columns, + const std::vector& col_ids); + + // init _cols member variable, must call before init method + void init_field(const std::vector& cols, + const std::vector& col_ids); + + // init all member variables except _cols + void init(const std::vector& col_ids, + size_t num_key_columns); }; } // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 340e74940f934e..29e0b75adad1a0 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -568,6 +568,7 @@ bool RowBlockMerger::merge( uint64_t* merged_rows) { uint64_t tmp_merged_rows = 0; RowCursor row_cursor; + std::unique_ptr arena(new Arena()); if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) { LOG(WARNING) << "fail to init row cursor."; goto MERGE_ERR; @@ -577,7 +578,7 @@ bool RowBlockMerger::merge( row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); while (_heap.size() > 0) { - init_row_with_others(&row_cursor, *(_heap.top().row_cursor)); + init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get()); if (!_pop_heap()) { goto MERGE_ERR; @@ -597,6 +598,10 @@ bool RowBlockMerger::merge( } agg_finalize_row(&row_cursor, nullptr); rowset_writer->add_row(row_cursor); + + // the memory allocate by arena has been copied, + // so we should release these memory immediately + arena.reset(new Arena()); } if (rowset_writer->flush() != OLAP_SUCCESS) { LOG(WARNING) << "failed to finalizing writer."; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 9cc2fbadce4b40..07438320ada2ca 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -93,6 +93,8 @@ FieldAggregationMethod TabletColumn::get_aggregation_type_by_string(const std::s aggregation_type = OLAP_FIELD_AGGREGATION_REPLACE; } else if (0 == upper_str.compare("HLL_UNION")) { aggregation_type = OLAP_FIELD_AGGREGATION_HLL_UNION; + } else if (0 == upper_str.compare("BITMAP_UNION")) { + aggregation_type = OLAP_FIELD_AGGREGATION_BITMAP_UNION; } else { LOG(WARNING) << "invalid aggregation type string. [aggregation='" << str << "']"; aggregation_type = OLAP_FIELD_AGGREGATION_UNKNOWN; @@ -191,6 +193,9 @@ std::string TabletColumn::get_string_by_aggregation_type(FieldAggregationMethod case OLAP_FIELD_AGGREGATION_HLL_UNION: return "HLL_UNION"; + case OLAP_FIELD_AGGREGATION_BITMAP_UNION: + return "BITMAP_UNION"; + default: return "UNKNOWN"; } @@ -237,6 +242,13 @@ TabletColumn::TabletColumn(FieldAggregationMethod agg, FieldType type) { _type = type; } +TabletColumn::TabletColumn(FieldAggregationMethod agg, FieldType filed_type, bool is_nullable) { + _aggregation = agg; + _type = filed_type; + _length = get_type_info(filed_type)->size(); + _is_nullable = is_nullable; +} + OLAPStatus TabletColumn::init_from_pb(const ColumnPB& column) { _unique_id = column.unique_id(); _col_name = column.name(); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index c5f74a74b27519..8b7b0f2c0c296c 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -30,6 +30,7 @@ class TabletColumn { public: TabletColumn(); TabletColumn(FieldAggregationMethod agg, FieldType type); + TabletColumn(FieldAggregationMethod agg, FieldType filed_type, bool is_nullable); OLAPStatus init_from_pb(const ColumnPB& column); OLAPStatus to_schema_pb(ColumnPB* column); diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 1f3cab67a0160a..43b5ad36bddd53 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -104,6 +104,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { } RowCursor row; + std::unique_ptr arena(new Arena()); res = row.init(tablet->tablet_schema(), reader_params.return_columns); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res); @@ -114,7 +115,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { bool eof = false; uint32_t row_checksum = 0; while (true) { - OLAPStatus res = reader.next_row_with_aggregation(&row, &eof); + OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), &eof); if (res == OLAP_SUCCESS && eof) { VLOG(3) << "reader reads to the end."; break; @@ -124,6 +125,9 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { } row_checksum = hash_row(row, row_checksum); + // the memory allocate by arena has been copied, + // so we should release these memory immediately + arena.reset(new Arena()); } LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum; diff --git a/be/src/olap/wrapper_field.cpp b/be/src/olap/wrapper_field.cpp index a1b251d174004a..b0f1f18571bc1b 100644 --- a/be/src/olap/wrapper_field.cpp +++ b/be/src/olap/wrapper_field.cpp @@ -30,7 +30,7 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) { return nullptr; } - Field* rep = Field::create(column); + Field* rep = FieldFactory::create(column); if (rep == nullptr) { return nullptr; } @@ -53,14 +53,14 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) { } WrapperField* WrapperField::create_by_type(const FieldType& type) { - Field* rep = Field::create_by_type(type); + Field* rep = FieldFactory::create_by_type(type); if (rep == nullptr) { return nullptr; } - bool is_string_type = (type == OLAP_FIELD_TYPE_CHAR - || type == OLAP_FIELD_TYPE_VARCHAR - || type == OLAP_FIELD_TYPE_HLL); - WrapperField* wrapper = new WrapperField(rep, 0, is_string_type); + bool is_string_type = (type == OLAP_FIELD_TYPE_CHAR + || type == OLAP_FIELD_TYPE_VARCHAR + || type == OLAP_FIELD_TYPE_HLL); + auto* wrapper = new WrapperField(rep, 0, is_string_type); return wrapper; } diff --git a/be/src/util/bitmap.h b/be/src/util/bitmap.h index 879038d1e7a40d..985dc2c13bd1fe 100644 --- a/be/src/util/bitmap.h +++ b/be/src/util/bitmap.h @@ -18,8 +18,11 @@ #ifndef DORIS_BE_SRC_COMMON_UITL_BITMAP_H #define DORIS_BE_SRC_COMMON_UITL_BITMAP_H +#include #include "util/bit_util.h" +#include "util/coding.h" #include "gutil/strings/fastmem.h" +#include namespace doris { @@ -126,7 +129,7 @@ class BitmapIterator { void Reset(const uint8_t* map, size_t num_bits) { offset_ = 0; - num_bits_ = num_bits_; + num_bits_ = num_bits; map_ = map; } @@ -248,6 +251,138 @@ class Bitmap { static const int64_t BIT_INDEX_MASK = 63; }; +// the wrapper class for RoaringBitmap +// todo(kks): improve for low cardinality set +class RoaringBitmap { +public: + RoaringBitmap() : _type(EMPTY) {} + + explicit RoaringBitmap(int32_t value): _int_value(value), _type(SINGLE){} + + // the src is the serialized bitmap data, the type could be EMPTY, SINGLE or BITMAP + explicit RoaringBitmap(const char* src) { + _type = (BitmapDataType)src[0]; + switch (_type) { + case EMPTY: + break; + case SINGLE: + _int_value = decode_fixed32_le(reinterpret_cast(src + 1)); + break; + case BITMAP: + _roaring = Roaring::read(src + 1); + } + } + + void update(const int32_t value) { + switch (_type) { + case EMPTY: + _int_value = value; + _type = SINGLE; + break; + case SINGLE: + _roaring.add(_int_value); + _roaring.add(value); + _type = BITMAP; + break; + case BITMAP: + _roaring.add(value); + } + } + + // specialty improve for empty bitmap and single int + // roaring bitmap add(int) is faster than merge(another bitmap) + // the _type maybe change: + // EMPTY -> SINGLE + // EMPTY -> BITMAP + // SINGLE -> BITMAP + void merge(const RoaringBitmap& bitmap) { + switch(bitmap._type) { + case EMPTY: + return; + case SINGLE: + update(bitmap._int_value); + return; + case BITMAP: + switch (_type) { + case EMPTY: + _roaring = bitmap._roaring; + _type = BITMAP; + break; + case SINGLE: + _roaring = bitmap._roaring; + _roaring.add(_int_value); + _type = BITMAP; + break; + case BITMAP: + _roaring |= bitmap._roaring; + } + return; + } + } + + int64_t cardinality() const { + switch (_type) { + case EMPTY: + return 0; + case SINGLE: + return 1; + case BITMAP: + return _roaring.cardinality(); + } + return 0; + } + + size_t size() { + switch (_type) { + case EMPTY: + return 1; + case SINGLE: + return sizeof(int32_t) + 1; + case BITMAP: + _roaring.runOptimize(); + return _roaring.getSizeInBytes() + 1; + } + return 1; + } + + //must call size() first + void serialize(char* dest) { + dest[0] = _type; + switch (_type) { + case EMPTY: + break; + case SINGLE: + encode_fixed32_le(reinterpret_cast(dest + 1), _int_value); + break; + case BITMAP: + _roaring.write(dest + 1); + } + } + + std::string toString() const { + switch (_type) { + case EMPTY: + return {}; + case SINGLE: + return std::to_string(_int_value); + case BITMAP: + return _roaring.toString(); + } + return {}; + } + +private: + enum BitmapDataType { + EMPTY = 0, + SINGLE = 1, // int32 + BITMAP = 2 + }; + + Roaring _roaring; + int32_t _int_value; + BitmapDataType _type; +}; + } #endif diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt index 25c95aeb42ad32..9e0b042f0b4751 100644 --- a/be/test/exprs/CMakeLists.txt +++ b/be/test/exprs/CMakeLists.txt @@ -29,4 +29,5 @@ ADD_BE_TEST(hybird_set_test) ADD_BE_TEST(string_functions_test) ADD_BE_TEST(timestamp_functions_test) ADD_BE_TEST(percentile_approx_test) +ADD_BE_TEST(bitmap_function_test) #ADD_BE_TEST(in-predicate-test) diff --git a/be/test/exprs/bitmap_function_test.cpp b/be/test/exprs/bitmap_function_test.cpp new file mode 100644 index 00000000000000..b535549994025e --- /dev/null +++ b/be/test/exprs/bitmap_function_test.cpp @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/aggregate_functions.h" +#include "exprs/anyval_util.h" +#include "exprs/bitmap_function.h" +#include +#include +#include "testutil/function_utils.h" +#include +#include "util/logging.h" + +#include + +namespace doris { + +StringVal convert_bitmap_to_string(FunctionContext* ctx, RoaringBitmap& bitmap) { + std::string buf; + buf.resize(bitmap.size()); + bitmap.serialize((char*)buf.c_str()); + return AnyValUtil::from_string_temp(ctx, buf); +} + +class BitmapFunctionsTest : public testing::Test { +public: + BitmapFunctionsTest() = default; + + void SetUp() { + utils = new FunctionUtils(); + ctx = utils->get_fn_ctx(); + } + void TearDown() { + delete utils; + } + +private: + FunctionUtils* utils; + FunctionContext* ctx; +}; + +TEST_F(BitmapFunctionsTest, to_bitmap) { + StringVal input = AnyValUtil::from_string_temp(ctx, std::string("1024")); + StringVal result = BitmapFunctions::to_bitmap(ctx, input); + + RoaringBitmap bitmap(1024); + StringVal expected = convert_bitmap_to_string(ctx, bitmap); + + ASSERT_EQ(expected, result); +} + +TEST_F(BitmapFunctionsTest, to_bitmap_null) { + StringVal input = StringVal::null(); + StringVal result = BitmapFunctions::to_bitmap(ctx, input); + + RoaringBitmap bitmap; + StringVal expected = convert_bitmap_to_string(ctx, bitmap); + + ASSERT_EQ(expected, result); +} + +TEST_F(BitmapFunctionsTest, bitmap_union_int) { + StringVal dst; + BitmapFunctions::bitmap_init(ctx, &dst); + IntVal src1(1); + BitmapFunctions::bitmap_update_int(ctx, src1, &dst); + IntVal src2(1234567); + BitmapFunctions::bitmap_update_int(ctx, src2, &dst); + + BigIntVal result = BitmapFunctions::bitmap_finalize(ctx, dst); + BigIntVal expected(2); + ASSERT_EQ(expected, result); +} + +TEST_F(BitmapFunctionsTest, bitmap_union) { + StringVal dst; + BitmapFunctions::bitmap_init(ctx, &dst); + + RoaringBitmap bitmap1(1024); + StringVal src1 = convert_bitmap_to_string(ctx, bitmap1); + BitmapFunctions::bitmap_union(ctx, src1, &dst); + + RoaringBitmap bitmap2; + StringVal src2 = convert_bitmap_to_string(ctx, bitmap1); + BitmapFunctions::bitmap_union(ctx, src2, &dst); + + StringVal serialized = BitmapFunctions::bitmap_serialize(ctx, dst); + + BigIntVal result = BitmapFunctions::bitmap_count(ctx, serialized); + BigIntVal expected(1); + ASSERT_EQ(expected, result); +} + +TEST_F(BitmapFunctionsTest, bitmap_count) { + RoaringBitmap bitmap(1024); + bitmap.update(1); + bitmap.update(2019); + StringVal bitmap_str = convert_bitmap_to_string(ctx, bitmap); + + BigIntVal result = BitmapFunctions::bitmap_count(ctx, bitmap_str); + BigIntVal expected(3); + ASSERT_EQ(expected, result); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/aggregate_func_test.cpp b/be/test/olap/aggregate_func_test.cpp index cea73b3cf25946..7d4ca56d743aa4 100644 --- a/be/test/olap/aggregate_func_test.cpp +++ b/be/test/olap/aggregate_func_test.cpp @@ -38,14 +38,13 @@ void test_min() { Arena arena; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MIN, field_type); - agg->init(buf, &arena); RowCursorCell dst(buf); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->update(&dst, val_buf, &arena); + agg->init(&dst, val_buf, true, &arena); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -92,7 +91,7 @@ void test_min() { memcpy(&val, buf + 1, sizeof(CppType)); ASSERT_EQ(50, val); } - agg->finalize(buf, &arena); + agg->finalize(&dst, &arena); ASSERT_FALSE(*(bool*)(buf)); CppType val; memcpy(&val, buf + 1, sizeof(CppType)); @@ -112,14 +111,13 @@ void test_max() { Arena arena; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MAX, field_type); - agg->init(buf, &arena); RowCursorCell dst(buf); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->update(&dst, val_buf, &arena); + agg->init(&dst, val_buf, true, &arena); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -165,7 +163,7 @@ void test_max() { memcpy(&val, buf + 1, sizeof(CppType)); ASSERT_EQ(200, val); } - agg->finalize(buf, &arena); + agg->finalize(&dst, &arena); ASSERT_FALSE(*(bool*)(buf)); CppType val; memcpy(&val, buf + 1, sizeof(CppType)); @@ -186,13 +184,12 @@ void test_sum() { Arena arena; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_SUM, field_type); - agg->init(buf, &arena); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->update(&dst, val_buf, &arena); + agg->init(&dst, val_buf, true, &arena); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -238,7 +235,7 @@ void test_sum() { memcpy(&val, buf + 1, sizeof(CppType)); ASSERT_EQ(350, val); } - agg->finalize(buf, &arena); + agg->finalize(&dst, &arena); ASSERT_FALSE(*(bool*)(buf)); CppType val; memcpy(&val, buf + 1, sizeof(CppType)); @@ -259,13 +256,12 @@ void test_replace() { Arena arena; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type); - agg->init(buf, &arena); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->update(&dst, val_buf, &arena); + agg->init(&dst, val_buf, true, &arena); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -297,7 +293,7 @@ void test_replace() { memcpy(&val, buf + 1, sizeof(CppType)); ASSERT_EQ(50, val); } - agg->finalize(buf, &arena); + agg->finalize(&dst, &arena); ASSERT_FALSE(*(bool*)(buf)); CppType val; memcpy(&val, buf + 1, sizeof(CppType)); @@ -317,7 +313,6 @@ void test_replace_string() { Arena arena; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type); - agg->init(dst, &arena); char src[string_field_size]; RowCursorCell src_cell(src); @@ -325,7 +320,7 @@ void test_replace_string() { // null { src_cell.set_null(); - agg->update(&dst_cell, src_cell, &arena); + agg->init(&dst_cell, (const char*)src_slice, true, &arena); ASSERT_TRUE(dst_cell.is_null()); } // "12345" @@ -364,7 +359,8 @@ void test_replace_string() { ASSERT_EQ(5, dst_slice->size); ASSERT_STREQ("12345", dst_slice->to_string().c_str()); } - agg->finalize(dst, &arena); + + agg->finalize(&dst_cell, &arena); ASSERT_FALSE(dst_cell.is_null()); ASSERT_EQ(5, dst_slice->size); ASSERT_STREQ("12345", dst_slice->to_string().c_str()); diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp index 25e6b4930af6ab..b9ad18c087cc05 100644 --- a/be/test/olap/generic_iterators_test.cpp +++ b/be/test/olap/generic_iterators_test.cpp @@ -36,9 +36,7 @@ class GenericIteratorsTest : public testing::Test { }; Schema create_schema() { - std::vector col_schemas; - - // c1: small int + std::vector col_schemas; col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_SMALLINT, true); // c2: int col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true); diff --git a/be/test/olap/row_cursor_test.cpp b/be/test/olap/row_cursor_test.cpp index 5339f58d43421d..0f7e06c32784b9 100644 --- a/be/test/olap/row_cursor_test.cpp +++ b/be/test/olap/row_cursor_test.cpp @@ -269,7 +269,7 @@ TEST_F(TestRowCursor, InitRowCursor) { OLAPStatus res = row.init(tablet_schema); ASSERT_EQ(res, OLAP_SUCCESS); ASSERT_EQ(row.get_fixed_len(), 126); - ASSERT_EQ(row.get_variable_len(), 16413); + ASSERT_EQ(row.get_variable_len(), 20); } TEST_F(TestRowCursor, InitRowCursorWithColumnCount) { @@ -447,6 +447,7 @@ TEST_F(TestRowCursor, AggregateWithoutNull) { set_tablet_schema_for_cmp_and_aggregate(&tablet_schema); RowCursor row; + std::unique_ptr arena(new Arena()); OLAPStatus res = row.init(tablet_schema); ASSERT_EQ(res, OLAP_SUCCESS); ASSERT_EQ(row.get_fixed_len(), 78); @@ -469,7 +470,7 @@ TEST_F(TestRowCursor, AggregateWithoutNull) { left.set_field_content(4, reinterpret_cast(&l_decimal), _mem_pool.get()); left.set_field_content(5, reinterpret_cast(&l_varchar), _mem_pool.get()); - init_row_with_others(&row, left); + init_row_with_others(&row, left, arena.get()); RowCursor right; res = right.init(tablet_schema); @@ -506,6 +507,7 @@ TEST_F(TestRowCursor, AggregateWithNull) { set_tablet_schema_for_cmp_and_aggregate(&tablet_schema); RowCursor row; + std::unique_ptr arena(new Arena()); OLAPStatus res = row.init(tablet_schema); ASSERT_EQ(res, OLAP_SUCCESS); ASSERT_EQ(row.get_fixed_len(), 78); @@ -526,7 +528,7 @@ TEST_F(TestRowCursor, AggregateWithNull) { left.set_null(4); left.set_field_content(5, reinterpret_cast(&l_varchar), _mem_pool.get()); - init_row_with_others(&row, left); + init_row_with_others(&row, left, arena.get()); RowCursor right; res = right.init(tablet_schema); diff --git a/docs/documentation/cn/sql-reference/sql-functions/aggregate-functions/bitmap.md b/docs/documentation/cn/sql-reference/sql-functions/aggregate-functions/bitmap.md new file mode 100644 index 00000000000000..36e81eb0eea1aa --- /dev/null +++ b/docs/documentation/cn/sql-reference/sql-functions/aggregate-functions/bitmap.md @@ -0,0 +1,69 @@ +#BITMAP + +## description +### Syntax + +`TO_BITMAP(expr)` : 将TINYINT,SMALLINT和INT类型的列转为Bitmap + +`BITMAP_UNION(expr)` : 计算两个Bitmap的交集,返回值是序列化后的Bitmap值 + +`BITMAP_COUNT(expr)` : 计算Bitmap中不同值的个数 + +`BITMAP_UNION_INT(expr)` : 计算TINYINT,SMALLINT和INT类型的列中不同值的个数,返回值和 +COUNT(DISTINCT expr)相同 + + +注意: + + 1. TO_BITMAP 函数输入的类型必须是TINYINT,SMALLINT,INT + 2. BITMAP_UNION函数的参数目前仅支持: + - 聚合模型中聚合类型为BITMAP_UNION的列 + - TO_BITMAP 函数 + +## example + +``` +CREATE TABLE `bitmap_udaf` ( + `id` int(11) NULL COMMENT "", + `id2` int(11) +) ENGINE=OLAP +DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 10; + + +mysql> select bitmap_count(bitmap_union(to_bitmap(id2))) from bitmap_udaf; ++----------------------------------------------+ +| bitmap_count(bitmap_union(to_bitmap(`id2`))) | ++----------------------------------------------+ +| 6 | ++----------------------------------------------+ + +mysql> select bitmap_union_int (id2) from bitmap_udaf; ++-------------------------+ +| bitmap_union_int(`id2`) | ++-------------------------+ +| 6 | ++-------------------------+ + + + +CREATE TABLE `bitmap_test` ( + `id` int(11) NULL COMMENT "", + `id2` varchar(20) bitmap_union NULL +) ENGINE=OLAP +AGGREGATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 10; + + +mysql> select bitmap_count(bitmap_union(id2)) from bitmap_test; ++-----------------------------------+ +| bitmap_count(bitmap_union(`id2`)) | ++-----------------------------------+ +| 8 | ++-----------------------------------+ + +``` + +## keyword + +BITMAP,BITMAP_COUNT,BITMAP_UNION,BITMAP_UNION_INT,TO_BITMAP diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 795f1bbc7cd9c1..43864f50f09b4e 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -51,11 +51,14 @@ 程度系统内控制,并且HLL列只能通过配套的hll_union_agg、Hll_cardinality、hll_hash进行查询或使用 agg_type:聚合类型,如果不指定,则该列为 key 列。否则,该列为 value 列 - SUM、MAX、MIN、REPLACE、HLL_UNION(仅用于HLL列,为HLL独有的聚合方式) + SUM、MAX、MIN、REPLACE、HLL_UNION(仅用于HLL列,为HLL独有的聚合方式)、BITMAP_UNION(列类型需要定义为VARCHAR(20)) 该类型只对聚合模型(key_desc的type为AGGREGATE KEY)有用,其它模型不需要指定这个。 是否允许为NULL: 默认不允许为 NULL。NULL 值在导入数据中用 \N 来表示 + 注意: + BITMAP_UNION聚合类型列在导入时的原始数据类型必须是TINYINT,SMALLINT,INT。 + 2. ENGINE 类型 默认为 olap。可选 mysql, broker 1) 如果是 mysql,则需要在 properties 提供以下信息: @@ -293,7 +296,20 @@ DISTRIBUTED BY HASH(k1) BUCKETS 32 PROPERTIES ("storage_type"="column"); - 7. 创建两张支持Colocat Join的表t1 和t2 + 7. 创建一张含有BITMAP_UNION聚合类型的表(v1和v2列的原始数据类型必须是TINYINT,SMALLINT,INT) + CREATE TABLE example_db.example_table + ( + k1 TINYINT, + k2 DECIMAL(10, 2) DEFAULT "10.5", + v1 VARCHAR(20) BITMAP_UNION, + v2 VARCHAR(20) BITMAP_UNION + ) + ENGINE=olap + AGGREGATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 32 + PROPERTIES ("storage_type"="column"); + + 8. 创建两张支持Colocat Join的表t1 和t2 CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "" @@ -314,7 +330,7 @@ "colocate_with" = "t1" ); - 8. 创建一个数据文件存储在BOS上的 broker 外部表 + 9. 创建一个数据文件存储在BOS上的 broker 外部表 CREATE EXTERNAL TABLE example_db.table_broker ( k1 DATE ) diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 7da0c6e9b88cd9..55f569bc2d71b2 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -94,6 +94,9 @@ 8. 导入数据进行严格模式过滤 curl --location-trusted -u root -H "strict_mode: true" -T testData http://host:port/api/testDb/testTbl/_stream_load + 9. 导入含有聚合模型为BITMAP_UNION列的表,可以是表中的列或者数据中的列用于生成BITMAP_UNION列 + curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1)" -T testData http://host:port/api/testDb/testTbl/_stream_load + ## keyword STREAM,LOAD diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 73985aa3c585d7..a0156d18d3e779 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -190,8 +190,8 @@ parser code {: :}; // Total keywords of doris -terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, - KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, +terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, + KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BITMAP_UNION, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CLUSTER, KW_CLUSTERS, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CURRENT, KW_CURRENT_USER, @@ -1402,6 +1402,10 @@ opt_agg_type ::= {: RESULT = AggregateType.HLL_UNION; :} + | KW_BITMAP_UNION + {: + RESULT = AggregateType.BITMAP_UNION; + :} ; opt_partition ::= @@ -3886,6 +3890,8 @@ keyword ::= {: RESULT = id; :} | KW_BEGIN:id {: RESULT = id; :} + | KW_BITMAP_UNION:id + {: RESULT = id; :} | KW_BOOLEAN:id {: RESULT = id; :} | KW_BROKER:id diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index b2dbd3118d6606..074ae8cd76f31c 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -76,6 +76,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.doris.catalog.AggregateType.BITMAP_UNION; + public class SchemaChangeHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SchemaChangeHandler.class); @@ -521,6 +523,10 @@ private void addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosi throw new DdlException("HLL must be used in AGG_KEYS"); } + if (newColumn.getAggregationType() == BITMAP_UNION && KeysType.AGG_KEYS != olapTable.getKeysType()) { + throw new DdlException("BITMAP_UNION must be used in AGG_KEYS"); + } + List baseSchema = olapTable.getBaseSchema(); String newColName = newColumn.getName(); boolean found = false; diff --git a/fe/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java b/fe/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java index 286740d0c4aa43..a10ba1cc60b836 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java +++ b/fe/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.doris.catalog.AggregateFunction; +import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,6 +151,7 @@ private TupleDescriptor createTupleDesc(Analyzer analyzer, boolean isOutputTuple // by compute stats and compute incremental stats, respectively. if (aggExpr.getFnName().getFunction().equals("count") || aggExpr.getFnName().getFunction().equals("ndv") + || aggExpr.getFnName().getFunction().equals(FunctionSet.BITMAP_UNION_INT) || aggExpr.getFnName().getFunction().equals("ndv_no_finalize")) { // TODO: Consider making nullability a property of types or of builtin agg fns. // row_number(), rank(), and dense_rank() are non-nullable as well. diff --git a/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java b/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java index 925ddf9a052f7a..cb3803b9b50ab2 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java +++ b/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java @@ -110,6 +110,7 @@ public enum Operator { // external query changes if we find a better algorithm). NDV("NDV", TAggregationOp.HLL, ScalarType.createVarcharType(64)), HLL_UNION_AGG("HLL_UNION_AGG", TAggregationOp.HLL_C, ScalarType.createVarcharType(64)), + BITMAP_UNION("BITMAP_UNION", TAggregationOp.BITMAP_UNION, ScalarType.createVarcharType(10)), COUNT_DISTINCT("COUNT_DISITNCT", TAggregationOp.COUNT_DISTINCT, Type.BIGINT), SUM_DISTINCT("SUM_DISTINCT", TAggregationOp.SUM_DISTINCT, null), LAG("LAG", TAggregationOp.LAG, null), diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index bb43b9b4823748..22450457ddcdf3 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -48,6 +48,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.doris.catalog.AggregateType.BITMAP_UNION; + public class CreateTableStmt extends DdlStmt { private static final Logger LOG = LogManager.getLogger(CreateTableStmt.class); @@ -246,6 +248,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { int rowLengthBytes = 0; boolean hasHll = false; + boolean hasBitmap = false; Set columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); for (ColumnDef columnDef : columnDefs) { if (engineName.equals("kudu")) { @@ -263,6 +266,14 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { hasHll = true; } + + if (columnDef.getAggregateType() == BITMAP_UNION) { + if (columnDef.isKey()) { + throw new AnalysisException("Key column can't has the BITMAP_UNION aggregation type"); + } + hasBitmap = true; + } + if (!columnSet.add(columnDef.getName())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, columnDef.getName()); } @@ -279,6 +290,10 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { throw new AnalysisException("HLL must be used in AGG_KEYS"); } + if (hasBitmap && keysDesc.getKeysType() != KeysType.AGG_KEYS) { + throw new AnalysisException("BITMAP_UNION must be used in AGG_KEYS"); + } + if (engineName.equals("olap")) { // analyze partition if (partitionDesc != null) { diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 6e3cc297a3cded..3241120480e35f 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -18,9 +18,11 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.AggregateFunction; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.ScalarFunction; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -250,10 +252,6 @@ public boolean isCountStar() { return false; } - // public BuiltinAggregateFunction.Operator getAggOp() { - // return aggOp; - // } - @Override protected void toThrift(TExprNode msg) { // TODO: we never serialize this to thrift if it's an aggregate function @@ -380,6 +378,30 @@ private void analyzeBuiltinAggFunction(Analyzer analyzer) throws AnalysisExcepti "hll only use in HLL_UNION_AGG or HLL_CARDINALITY , HLL_HASH and so on."); } + if ((fnName.getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION_INT) && !arg.type.isInteger32Type())) { + throw new AnalysisException("BITMAP_UNION_INT params only support TINYINT or SMALLINT or INT"); + } + + if ((fnName.getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION))) { + if (children.size() != 1) { + throw new AnalysisException("BITMAP_UNION function could only have one child"); + } + + if (getChild(0) instanceof SlotRef) { + SlotRef slotRef = (SlotRef) getChild(0); + if (slotRef.getDesc().getColumn().getAggregationType() != AggregateType.BITMAP_UNION) { + throw new AnalysisException("BITMAP_UNION function require the column is BITMAP_UNION aggregate type"); + } + } else if (getChild(0) instanceof FunctionCallExpr) { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) getChild(0); + if (!functionCallExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.TO_BITMAP)) { + throw new AnalysisException("BITMAP_UNION function only support TO_BITMAP function as it's child"); + } + } else { + throw new AnalysisException("BITMAP_UNION only support BITMAP_UNION(column) or BITMAP_UNION(TO_BITMAP(column))"); + } + } + if ((fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG") || fnName.getFunction().equalsIgnoreCase("HLL_CARDINALITY") || fnName.getFunction().equalsIgnoreCase("HLL_RAW_AGG")) diff --git a/fe/src/main/java/org/apache/doris/catalog/AggregateType.java b/fe/src/main/java/org/apache/doris/catalog/AggregateType.java index 45b8ebb3d27fb7..a9edd7863cc681 100644 --- a/fe/src/main/java/org/apache/doris/catalog/AggregateType.java +++ b/fe/src/main/java/org/apache/doris/catalog/AggregateType.java @@ -34,7 +34,8 @@ public enum AggregateType { MAX("MAX"), REPLACE("REPLACE"), HLL_UNION("HLL_UNION"), - NONE("NONE"); + NONE("NONE"), + BITMAP_UNION("BITMAP_UNION"); private static EnumMap> compatibilityMap; @@ -87,6 +88,10 @@ public enum AggregateType { primitiveTypeList.clear(); primitiveTypeList.add(PrimitiveType.HLL); compatibilityMap.put(HLL_UNION, EnumSet.copyOf(primitiveTypeList)); + + primitiveTypeList.clear(); + primitiveTypeList.add(PrimitiveType.VARCHAR); + compatibilityMap.put(BITMAP_UNION, EnumSet.copyOf(primitiveTypeList)); compatibilityMap.put(NONE, EnumSet.allOf(PrimitiveType.class)); } @@ -127,6 +132,8 @@ public TAggregationType toThrift() { return TAggregationType.NONE; case HLL_UNION: return TAggregationType.HLL_UNION; + case BITMAP_UNION: + return TAggregationType.BITMAP_UNION; default: return null; } diff --git a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java index 264339ad498455..a5c9c715a19f5c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -513,6 +513,21 @@ public void init() { .build(); + public static final String BITMAP_UNION = "bitmap_union"; + public static final String BITMAP_UNION_INT = "bitmap_union_int"; + public static final String BITMAP_COUNT = "bitmap_count"; + public static final String TO_BITMAP = "to_bitmap"; + + private static final Map BITMAP_UNION_INT_SYMBOL = + ImmutableMap.builder() + .put(Type.TINYINT, + "_ZN5doris15BitmapFunctions17bitmap_update_intIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE") + .put(Type.SMALLINT, + "_ZN5doris15BitmapFunctions17bitmap_update_intIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE") + .put(Type.INT, + "_ZN5doris15BitmapFunctions17bitmap_update_intIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE") + .build(); + public Function getFunction(Function desc, Function.CompareMode mode) { List fns = functions.get(desc.functionName()); if (fns == null) { @@ -707,6 +722,16 @@ private void initAggregateBuiltins() { prefix + "30count_distinct_string_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", false, true, true)); + addBuiltin(AggregateFunction.createBuiltin(BITMAP_UNION, Lists.newArrayList(t), + Type.VARCHAR, + Type.VARCHAR, + "_ZN5doris15BitmapFunctions11bitmap_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + "_ZN5doris15BitmapFunctions12bitmap_unionEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris15BitmapFunctions12bitmap_unionEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris15BitmapFunctions16bitmap_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris15BitmapFunctions16bitmap_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + true, false, true)); + } else if (t == Type.TINYINT || t == Type.SMALLINT || t == Type.INT || t == Type.BIGINT || t == Type.LARGEINT || t == Type.DOUBLE) { addBuiltin(AggregateFunction.createBuiltin("multi_distinct_count", Lists.newArrayList(t), @@ -826,6 +851,16 @@ private void initAggregateBuiltins() { prefix + "12hll_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", true, false, true)); + // BITMAP_UNION_INT + addBuiltin(AggregateFunction.createBuiltin(BITMAP_UNION_INT, + Lists.newArrayList(t), Type.BIGINT, Type.VARCHAR, + "_ZN5doris15BitmapFunctions11bitmap_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + BITMAP_UNION_INT_SYMBOL.get(t), + "_ZN5doris15BitmapFunctions12bitmap_unionEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + "_ZN5doris15BitmapFunctions16bitmap_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + "_ZN5doris15BitmapFunctions15bitmap_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + true, false, true)); + // HLL_UNION_AGG addBuiltin(AggregateFunction.createBuiltin("hll_union_agg", Lists.newArrayList(t), Type.BIGINT, Type.VARCHAR, diff --git a/fe/src/main/java/org/apache/doris/catalog/Type.java b/fe/src/main/java/org/apache/doris/catalog/Type.java index c967d3239128c1..ab5b53b011c25c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/src/main/java/org/apache/doris/catalog/Type.java @@ -213,6 +213,11 @@ public boolean isIntegerType() { return isScalarType(PrimitiveType.TINYINT) || isScalarType(PrimitiveType.SMALLINT) || isScalarType(PrimitiveType.INT) || isScalarType(PrimitiveType.BIGINT); } + + public boolean isInteger32Type() { + return isScalarType(PrimitiveType.TINYINT) || isScalarType(PrimitiveType.SMALLINT) + || isScalarType(PrimitiveType.INT); + } public boolean isLargeIntType() { return isScalarType(PrimitiveType.LARGEINT); diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index e4b6269ec744d3..5caf45c901b242 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -48,6 +48,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.FunctionSet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; @@ -536,6 +537,18 @@ private void turnOffPreAgg(AggregateInfo aggInfo, SelectStmt selectStmt, Analyze returnColumnValidate = false; break; } + } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION_INT)) { + if ((!col.isKey())) { + turnOffReason = "BITMAP_UNION_INT function with non-key column: " + col.getName(); + returnColumnValidate = false; + break; + } + } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION)) { + if (col.getAggregationType() != AggregateType.BITMAP_UNION) { + turnOffReason = "Aggregate Operator not match: BITMAP_UNION <--> " + col.getAggregationType(); + returnColumnValidate = false; + break; + } } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase("multi_distinct_count")) { // count(distinct k1), count(distinct k2) / count(distinct k1,k2) can turn on pre aggregation if ((!col.isKey())) { diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index fb80ff749474b2..9ffbf4f7134933 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -168,7 +168,8 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("hash", new Integer(SqlParserSymbols.KW_HASH)); keywordMap.put("having", new Integer(SqlParserSymbols.KW_HAVING)); keywordMap.put("help", new Integer(SqlParserSymbols.KW_HELP)); - keywordMap.put("hll_union", new Integer(SqlParserSymbols.KW_HLL_UNION)); + keywordMap.put("hll_union", new Integer(SqlParserSymbols.KW_HLL_UNION)); + keywordMap.put("bitmap_union", new Integer(SqlParserSymbols.KW_BITMAP_UNION)); keywordMap.put("hub", new Integer(SqlParserSymbols.KW_HUB)); keywordMap.put("identified", new Integer(SqlParserSymbols.KW_IDENTIFIED)); keywordMap.put("if", new Integer(SqlParserSymbols.KW_IF)); diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 3ed1d73d68fa98..0ddea3cacef257 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -588,6 +588,14 @@ [['hll_hash'], 'VARCHAR', ['VARCHAR'], '_ZN5doris16HllHashFunctions8hll_hashEPN9doris_udf15FunctionContextERKNS1_9StringValE'], + #bitmap function + + [['to_bitmap'], 'VARCHAR', ['VARCHAR'], + '_ZN5doris15BitmapFunctions9to_bitmapEPN9doris_udf15FunctionContextERKNS1_9StringValE'], + [['bitmap_count'], 'BIGINT', ['VARCHAR'], + '_ZN5doris15BitmapFunctions12bitmap_countEPN9doris_udf15FunctionContextERKNS1_9StringValE'], + + # aes and base64 function [['aes_encrypt'], 'VARCHAR', ['VARCHAR', 'VARCHAR'], '_ZN5doris19EncryptionFunctions11aes_encryptEPN9doris_udf' diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6c0d5541e0caeb..73a639d21f621e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -327,6 +327,7 @@ enum TAggregationOp { ROW_NUMBER, LAG, HLL_C, + BITMAP_UNION, } //struct TAggregateFunctionCall { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 659f179d79ca36..c354e89390b725 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -129,7 +129,8 @@ enum TAggregationType { MIN, REPLACE, HLL_UNION, - NONE + NONE, + BITMAP_UNION } enum TPushType { diff --git a/run-ut.sh b/run-ut.sh index 1e18c2c736d673..dd982e60a5bea3 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -164,6 +164,7 @@ ${DORIS_TEST_BINARY_DIR}/exprs/string_functions_test ${DORIS_TEST_BINARY_DIR}/exprs/json_function_test ${DORIS_TEST_BINARY_DIR}/exprs/timestamp_functions_test ${DORIS_TEST_BINARY_DIR}/exprs/percentile_approx_test +${DORIS_TEST_BINARY_DIR}/exprs/bitmap_function_test ## Running geo unit test ${DORIS_TEST_BINARY_DIR}/geo/geo_functions_test