diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index 98fff35d6caaa4..70d52e8c3f42f6 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -56,6 +56,13 @@ class ObjectPool { _objects.clear(); } + // Absorb all objects from src pool + // Note: This method is not thread safe + void acquire_data(ObjectPool* src) { + _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end()); + src->_objects.clear(); + } + private: struct GenericElement { virtual ~GenericElement() {} diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 5facfa6c5c2c54..ab0059645d783a 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -530,6 +530,25 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); } +void ExecNode::try_do_aggregate_serde_improve() { + std::vector agg_node; + collect_nodes(TPlanNodeType::AGGREGATION_NODE, &agg_node); + if (agg_node.size() != 1) { + return; + } + + if (agg_node[0]->_children.size() != 1) { + return; + } + + if (agg_node[0]->_children[0]->type() != TPlanNodeType::OLAP_SCAN_NODE) { + return; + } + + OlapScanNode* scan_node = static_cast(agg_node[0]->_children[0]); + scan_node->set_no_agg_finalize(); +} + void ExecNode::init_runtime_profile(const std::string& name) { std::stringstream ss; ss << name << " (id=" << _id << ")"; diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ace7935926c7cb..e40a4d316a538a 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -159,6 +159,12 @@ class ExecNode { // Collect all scan node types. void collect_scan_nodes(std::vector* nodes); + // When the agg node is the scan node direct parent, + // we directly return agg object from scan node to agg node, + // and don't serialize the agg object. + // This improve is cautious, we ensure the correctness firstly. + void try_do_aggregate_serde_improve(); + typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row); // Evaluate exprs over row. Returns true if all exprs return true. // TODO: This doesn't use the vector signature because I haven't figured diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 8f515515a57514..4acddb4d5bd69f 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -150,7 +150,6 @@ Status OlapScanNode::prepare(RuntimeState* state) { _rows_pushed_cond_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsPushedCondFiltered", TUnit::UNIT); _init_counter(state); - _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == NULL) { // TODO: make sure we print all available diagnostic output to our error log @@ -696,7 +695,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { scanner_ranges.push_back((*ranges)[i].get()); } OlapScanner* scanner = new OlapScanner( - state, this, _olap_scan_node.is_preaggregation, *scan_range, scanner_ranges); + state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); } diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 434e423e828896..fa280b4425a415 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -58,7 +58,9 @@ class OlapScanNode : public ScanNode { Status collect_query_statistics(QueryStatistics* statistics) override; virtual Status close(RuntimeState* state); virtual Status set_scan_ranges(const std::vector& scan_ranges); - + inline void set_no_agg_finalize() { + _need_agg_finalize = false; + } protected: typedef struct { Tuple* tuple; @@ -242,6 +244,8 @@ class OlapScanNode : public ScanNode { int64_t _running_thread; EvalConjunctsFn _eval_conjuncts_fn; + bool _need_agg_finalize = true; + // Counters RuntimeProfile::Counter* _io_timer = nullptr; RuntimeProfile::Counter* _read_compressed_counter = nullptr; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 008cd5424e3a55..f4afb85ae3dd2c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -43,6 +43,7 @@ OlapScanner::OlapScanner( RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation, + bool need_agg_finalize, const TPaloScanRange& scan_range, const std::vector& key_ranges) : _runtime_state(runtime_state), @@ -52,6 +53,7 @@ OlapScanner::OlapScanner( _string_slots(parent->_string_slots), _is_open(false), _aggregation(aggregation), + _need_agg_finalize(need_agg_finalize), _tuple_idx(parent->_tuple_idx), _direct_conjunct_size(parent->_direct_conjunct_size) { _reader.reset(new Reader()); @@ -213,6 +215,11 @@ Status OlapScanner::_init_params( } _read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); + // If a agg node is this scan node direct parent + // we will not call agg object finalize method in scan node, + // to avoid the unnecessary SerDe and improve query performance + _params.need_agg_finalize = _need_agg_finalize; + return Status::OK(); } @@ -264,7 +271,7 @@ Status OlapScanner::get_batch( break; } // Read one row from reader - auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof); + auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), batch->agg_object_pool(), eof); if (res != OLAP_SUCCESS) { return Status::InternalError("Internal Error: read storage fail."); } diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index 0708da6c7fedff..b163518f30efdd 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -53,6 +53,7 @@ class OlapScanner { RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation, + bool need_agg_finalize, const TPaloScanRange& scan_range, const std::vector& key_ranges); @@ -107,6 +108,7 @@ class OlapScanner { int _id; bool _is_open; bool _aggregation; + bool _need_agg_finalize = true; bool _has_update_counter = false; Status _ctor_status; diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp index 0c363d6c59fc71..0a513f3e415712 100644 --- a/be/src/exprs/aggregate_functions.cpp +++ b/be/src/exprs/aggregate_functions.cpp @@ -1191,7 +1191,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx, } DCHECK(!dst->is_null); - dst->agg_parse_and_cal(src); + dst->agg_parse_and_cal(ctx, src); return ; } diff --git a/be/src/exprs/bitmap_function.cpp b/be/src/exprs/bitmap_function.cpp index 5a42e798fb84ee..6a45fc5c725820 100644 --- a/be/src/exprs/bitmap_function.cpp +++ b/be/src/exprs/bitmap_function.cpp @@ -48,15 +48,24 @@ BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal } void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) { - RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr); auto* dst_bitmap = reinterpret_cast(dst->ptr); - dst_bitmap->merge(src_bitmap); + // zero size means the src input is a agg object + if (src.len == 0) { + dst_bitmap->merge(*reinterpret_cast(src.ptr)); + } else { + dst_bitmap->merge(RoaringBitmap((char*)src.ptr)); + } } BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) { - RoaringBitmap bitmap ((char*)src.ptr); - BigIntVal result(bitmap.cardinality()); - return result; + // zero size means the src input is a agg object + if (src.len == 0) { + auto bitmap = reinterpret_cast(src.ptr); + return {bitmap->cardinality()}; + } else { + RoaringBitmap bitmap ((char*)src.ptr); + return {bitmap.cardinality()}; + } } StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) { diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp index e91f947b345d28..246f60906af635 100644 --- a/be/src/exprs/hll_function.cpp +++ b/be/src/exprs/hll_function.cpp @@ -64,10 +64,15 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) { dst_hll->update(hash_value); } } + void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) { - HyperLogLog src_hll((uint8_t*)src.ptr); auto* dst_hll = reinterpret_cast(dst->ptr); - dst_hll->merge(src_hll); + // zero size means the src input is a agg object + if (src.len == 0) { + dst_hll->merge(*reinterpret_cast(src.ptr)); + } else { + dst_hll->merge(HyperLogLog(src.ptr)); + } } BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) { diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 8381dca094db7d..b3155cf747a8fc 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -17,6 +17,7 @@ #pragma once +#include "common/object_pool.h" #include "olap/hll.h" #include "olap/types.h" #include "olap/row_cursor_cell.h" @@ -28,7 +29,7 @@ namespace doris { -using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena); +using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool); using AggUpdateFunc = void (*)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena); using AggFinalizeFunc = void (*)(RowCursorCell* src, Arena* arena); @@ -43,8 +44,8 @@ class AggregateInfo { // Memory Note: For plain memory can be allocated from arena, whose lifetime // will last util finalize function is called. Memory allocated from heap should // be freed in finalize functioin to avoid memory leak. - inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const { - _init_fn(dst, src, src_null, arena); + inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const { + _init_fn(dst, src, src_null, arena, agg_pool); } // Update aggregated intermediate data. Data stored in engine is aggregated. @@ -73,7 +74,7 @@ class AggregateInfo { FieldAggregationMethod agg_method() const { return _agg_method; } private: - void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena); + void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool); void (*_update_fn)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena); void (*_finalize_fn)(RowCursorCell* src, Arena* arena); @@ -87,7 +88,7 @@ class AggregateInfo { template struct BaseAggregateFuncs { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { dst->set_is_null(src_null); if (src_null) { return; @@ -113,7 +114,7 @@ struct AggregateFuncTraits : public BaseAggregateFuncs { template <> struct AggregateFuncTraits : public BaseAggregateFuncs { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { dst->set_is_null(src_null); if (src_null) { return; @@ -129,7 +130,7 @@ struct AggregateFuncTraits template <> struct AggregateFuncTraits : public BaseAggregateFuncs { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { dst->set_is_null(src_null); if (src_null) { return; @@ -144,7 +145,7 @@ struct AggregateFuncTraits struct AggregateFuncTraits : public BaseAggregateFuncs { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { dst->set_is_null(src_null); if (src_null) { return; @@ -398,17 +399,21 @@ struct AggregateFuncTraits // so when init, update hll, the src is not null template <> struct AggregateFuncTraits { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { DCHECK_EQ(src_null, false); dst->set_not_null(); auto* src_slice = reinterpret_cast(src); auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); - dst_slice->size = sizeof(HyperLogLog); - // use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage. - char* mem = arena->Allocate(dst_slice->size); - dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data); + // we use zero size represent this slice is a agg object + dst_slice->size = 0; + auto* hll = new HyperLogLog((const uint8_t*) src_slice->data); + dst_slice->data = reinterpret_cast(hll); + + arena->track_memory(sizeof(HyperLogLog)); + + agg_pool->add(hll); } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -425,33 +430,36 @@ struct AggregateFuncTraits(src_slice->data); dst_hll->merge(*src_hll); - // NOT use 'delete src_hll' because the memory is managed by arena - src_hll->~HyperLogLog(); } } + // The HLL object memory will be released by ObjectPool static void finalize(RowCursorCell* src, Arena* arena) { auto *slice = reinterpret_cast(src->mutable_cell_ptr()); auto *hll = reinterpret_cast(slice->data); slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN); slice->size = hll->serialize((uint8_t*)slice->data); - // NOT using 'delete hll' because the memory is managed by arena - hll->~HyperLogLog(); } }; // when data load, after bitmap_init fucntion, bitmap_union column won't be null // so when init, update bitmap, the src is not null template <> struct AggregateFuncTraits { - static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) { + static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) { DCHECK_EQ(src_null, false); dst->set_not_null(); auto* src_slice = reinterpret_cast(src); auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); - dst_slice->size = sizeof(RoaringBitmap); - dst_slice->data = (char*)new RoaringBitmap(src_slice->data); + // we use zero size represent this slice is a agg object + dst_slice->size = 0; + auto* bitmap = new RoaringBitmap(src_slice->data); + dst_slice->data = (char*) bitmap; + + arena->track_memory(sizeof(RoaringBitmap)); + + agg_pool->add(bitmap); } static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) { @@ -468,11 +476,10 @@ struct AggregateFuncTraits(src_slice->data); dst_bitmap->merge(*src_bitmap); - - delete src_bitmap; } } + // The RoaringBitmap object memory will be released by ObjectPool static void finalize(RowCursorCell* src, Arena *arena) { auto *slice = reinterpret_cast(src->mutable_cell_ptr()); auto *bitmap = reinterpret_cast(slice->data); @@ -480,8 +487,6 @@ struct AggregateFuncTraitssize = bitmap->size(); slice->data = arena->Allocate(slice->size); bitmap->serialize(slice->data); - - delete bitmap; } }; diff --git a/be/src/olap/field.h b/be/src/olap/field.h index 32ee098a1e418d..caecdd909f1628 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -66,8 +66,8 @@ class Field { _agg_info->finalize(dst, arena); } - virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const { - _agg_info->init(dst, src, src_null, arena); + virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const { + _agg_info->init(dst, src, src_null, arena, agg_pool); } // todo(kks): Unify AggregateInfo::init method and Field::agg_init method @@ -76,7 +76,7 @@ class Field { // This functionn differs copy functionn in that if this filed // contain aggregate information, this functionn will initialize // destination in aggregate format, and update with srouce content. - virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const { + virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const { direct_copy(dst, src); } @@ -344,7 +344,7 @@ class CharField: public Field { } // the char field is especial, which need the _length info when consume raw data - void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const override { + void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const override { dst->set_is_null(src_null); if (src_null) { return; @@ -403,8 +403,8 @@ class BitmapAggField: public Field { } // bitmap storage data always not null - void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override { - _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena); + void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override { + _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool); } char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { @@ -424,8 +424,8 @@ class HllAggField: public Field { } // Hll storage data always not null - void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override { - _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena); + void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override { + _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool); } char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 5a1b240ae219b9..ae7e82b23ba67f 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -68,7 +68,7 @@ void MemTable::insert(Tuple* tuple) { bool is_null = tuple->is_null(slot->null_indicator_offset()); void* value = tuple->get_slot(slot->tuple_offset()); - _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena()); + _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena(), &_agg_object_pool); } bool overwritten = false; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 80be744cf7708d..3856c83058835a 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H #define DORIS_BE_SRC_OLAP_MEMTABLE_H +#include "common/object_pool.h" #include "olap/schema.h" #include "olap/skiplist.h" #include "runtime/tuple.h" @@ -57,6 +58,7 @@ class MemTable { RowCursorComparator _row_comparator; Arena _arena; + ObjectPool _agg_object_pool; typedef SkipList Table; char* _tuple_buf; diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 31417381033a64..ae1867efca271b 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -44,14 +44,14 @@ OLAPStatus Merger::merge_rowsets(TabletSharedPtr tablet, RETURN_NOT_OK_LOG(row_cursor.init(tablet->tablet_schema()), "failed to init row cursor when merging rowsets of tablet " + tablet->full_name()); row_cursor.allocate_memory_for_string_type(tablet->tablet_schema()); - // The following procedure would last for long time, half of one day, etc. int64_t output_rows = 0; while (true) { Arena arena; + ObjectPool objectPool; bool eof = false; // Read one row into row_cursor - RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &eof), + RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &objectPool, &eof), "failed to read next row when merging rowsets of tablet " + tablet->full_name()); if (eof) { break; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 2151c1277bb8a4..9255faa82e6b8a 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -345,7 +345,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return OLAP_SUCCESS; } -OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { +OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; @@ -360,12 +360,12 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* return OLAP_SUCCESS; } -OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { +OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; } - init_row_with_others(row_cursor, *_next_key, arena); + init_row_with_others(row_cursor, *_next_key, arena, agg_pool); int64_t merged_count = 0; do { auto res = _collect_iter->next(&_next_key, &_next_delete_flag); @@ -389,11 +389,15 @@ OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* ++merged_count; } while (true); _merged_rows += merged_count; - agg_finalize_row(_value_cids, row_cursor, arena); + // For agg query, we don't need finalize agg object and directly pass agg object to agg node + if (_need_agg_finalize) { + agg_finalize_row(_value_cids, row_cursor, arena); + } + return OLAP_SUCCESS; } -OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) { +OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) { *eof = false; bool cur_delete_flag = false; do { @@ -403,7 +407,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, boo } cur_delete_flag = _next_delete_flag; - init_row_with_others(row_cursor, *_next_key, arena); + init_row_with_others(row_cursor, *_next_key, arena, agg_pool); int64_t merged_count = 0; while (NULL != _next_key) { @@ -564,6 +568,7 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) { read_params.check_validation(); OLAPStatus res = OLAP_SUCCESS; _aggregation = read_params.aggregation; + _need_agg_finalize = read_params.need_agg_finalize; _reader_type = read_params.reader_type; _tablet = read_params.tablet; _version = read_params.version; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 48f3be4fb02c8a..2183cae0ad63ee 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -53,6 +53,7 @@ struct ReaderParams { TabletSharedPtr tablet; ReaderType reader_type; bool aggregation; + bool need_agg_finalize = true; Version version; // possible values are "gt", "ge", "eq" std::string range; @@ -125,8 +126,8 @@ class Reader { // Return OLAP_SUCCESS and set `*eof` to false when next row is read into `row_cursor`. // Return OLAP_SUCCESS and set `*eof` to true when no more rows can be read. // Return others when unexpected error happens. - OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, bool *eof) { - return (this->*_next_row_func)(row_cursor, arena, eof); + OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, ObjectPool* agg_pool, bool *eof) { + return (this->*_next_row_func)(row_cursor, arena, agg_pool, eof); } uint64_t merged_rows() const { @@ -200,9 +201,9 @@ class Reader { OLAPStatus _init_load_bf_columns(const ReaderParams& read_params); - OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); - OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); - OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof); + OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof); + OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof); + OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof); TabletSharedPtr tablet() { return _tablet; } @@ -233,9 +234,11 @@ class Reader { DeleteHandler _delete_handler; - OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, bool* eof) = nullptr; + OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) = nullptr; bool _aggregation; + // for agg query, we don't need to finalize when scan agg object data + bool _need_agg_finalize = true; bool _version_locked; ReaderType _reader_type; bool _next_delete_flag; diff --git a/be/src/olap/row.h b/be/src/olap/row.h index 1dd0581df40971..64ea139d2c56bf 100644 --- a/be/src/olap/row.h +++ b/be/src/olap/row.h @@ -107,10 +107,10 @@ int index_compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { // function will first initialize destination column and then update with source column // value. template -void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena) { +void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena, ObjectPool* agg_pool) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); - dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena); + dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena, agg_pool); } } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 9bf5bb01d9be81..b7eed210b501f3 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -569,6 +569,7 @@ bool RowBlockMerger::merge( uint64_t tmp_merged_rows = 0; RowCursor row_cursor; std::unique_ptr arena(new Arena()); + std::unique_ptr agg_object_pool(new ObjectPool()); if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) { LOG(WARNING) << "fail to init row cursor."; goto MERGE_ERR; @@ -578,7 +579,7 @@ bool RowBlockMerger::merge( row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); while (_heap.size() > 0) { - init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get()); + init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get(), agg_object_pool.get()); if (!_pop_heap()) { goto MERGE_ERR; @@ -604,6 +605,7 @@ bool RowBlockMerger::merge( // the memory allocate by arena has been copied, // so we should release these memory immediately arena.reset(new Arena()); + agg_object_pool.reset(new ObjectPool()); } if (rowset_writer->flush() != OLAP_SUCCESS) { LOG(WARNING) << "failed to finalizing writer."; diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 43b5ad36bddd53..99d2333cabfb87 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -105,6 +105,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { RowCursor row; std::unique_ptr arena(new Arena()); + std::unique_ptr agg_object_pool(new ObjectPool()); res = row.init(tablet->tablet_schema(), reader_params.return_columns); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res); @@ -115,7 +116,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { bool eof = false; uint32_t row_checksum = 0; while (true) { - OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), &eof); + OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), agg_object_pool.get(), &eof); if (res == OLAP_SUCCESS && eof) { VLOG(3) << "reader reads to the end."; break; @@ -128,6 +129,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { // the memory allocate by arena has been copied, // so we should release these memory immediately arena.reset(new Arena()); + agg_object_pool.reset(new ObjectPool()); } LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 729542e898715a..4a9eb0e02db7d7 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -173,6 +173,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { VLOG(1) << "scan_nodes.size()=" << scan_nodes.size(); VLOG(1) << "params.per_node_scan_ranges.size()=" << params.per_node_scan_ranges.size(); + _plan->try_do_aggregate_serde_improve(); + for (int i = 0; i < scan_nodes.size(); ++i) { ScanNode* scan_node = static_cast(scan_nodes[i]); const std::vector& scan_ranges = diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 6e31a446fca1ba..19152a234c8680 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -48,7 +48,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)) { + _tuple_data_pool(new MemPool(_mem_tracker)), + _agg_object_pool(new ObjectPool()) { DCHECK(_mem_tracker != NULL); DCHECK_GT(capacity, 0); _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); @@ -82,7 +83,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)) { + _tuple_data_pool(new MemPool(_mem_tracker)), + _agg_object_pool(new ObjectPool()) { DCHECK(_mem_tracker != nullptr); _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); @@ -173,7 +175,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)) { + _tuple_data_pool(new MemPool(_mem_tracker)), + _agg_object_pool(new ObjectPool()) { DCHECK(_mem_tracker != NULL); _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); @@ -259,6 +262,7 @@ void RowBatch::clear() { } _tuple_data_pool->free_all(); + _agg_object_pool.reset(new ObjectPool()); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); } @@ -464,6 +468,7 @@ void RowBatch::reset() { // TODO: Change this to Clear() and investigate the repercussions. _tuple_data_pool->free_all(); + _agg_object_pool.reset(new ObjectPool()); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); } @@ -500,35 +505,30 @@ void RowBatch::close_tuple_streams() { void RowBatch::transfer_resource_ownership(RowBatch* dest) { dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes(); dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false); + dest->_agg_object_pool->acquire_data(_agg_object_pool.get()); for (int i = 0; i < _io_buffers.size(); ++i) { DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i]; dest->_io_buffers.push_back(buffer); dest->_auxiliary_mem_usage += buffer->buffer_len(); buffer->set_mem_tracker(dest->_mem_tracker); } - _io_buffers.clear(); for (BufferInfo& buffer_info : _buffers) { dest->add_buffer( buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES); } - _buffers.clear(); for (int i = 0; i < _tuple_streams.size(); ++i) { dest->_tuple_streams.push_back(_tuple_streams[i]); dest->_auxiliary_mem_usage += _tuple_streams[i]->byte_size(); } - _tuple_streams.clear(); + for (int i = 0; i < _blocks.size(); ++i) { dest->_blocks.push_back(_blocks[i]); dest->_auxiliary_mem_usage += _blocks[i]->buffer_len(); } - _blocks.clear(); + dest->_need_to_return |= _need_to_return; - _auxiliary_mem_usage = 0; - if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) { - _tuple_ptrs = NULL; - } if (_needs_deep_copy) { dest->mark_needs_deep_copy(); @@ -589,28 +589,6 @@ void RowBatch::acquire_state(RowBatch* src) { src->transfer_resource_ownership(this); } -void RowBatch::swap(RowBatch* other) { - DCHECK(_row_desc.equals(other->_row_desc)); - DCHECK_EQ(_num_tuples_per_row, other->_num_tuples_per_row); - DCHECK_EQ(_tuple_ptrs_size, other->_tuple_ptrs_size); - - // The destination row batch should be empty. - DCHECK(!_has_in_flight_row); - DCHECK_EQ(_tuple_data_pool->total_reserved_bytes(), 0); - - std::swap(_has_in_flight_row, other->_has_in_flight_row); - std::swap(_num_rows, other->_num_rows); - std::swap(_capacity, other->_capacity); - if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) { - // Tuple pointers are allocated from tuple_data_pool_ so are transferred. - _tuple_ptrs = other->_tuple_ptrs; - other->_tuple_ptrs = NULL; - } else { - // tuple_ptrs_ were allocated with malloc so can be swapped between batches. - std::swap(_tuple_ptrs, other->_tuple_ptrs); - } -} - // TODO: consider computing size of batches as they are built up int RowBatch::total_byte_size() { int result = 0; diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 531c6ec830eb93..7aaf2a311ce866 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -245,6 +245,9 @@ class RowBatch : public RowBatchInterface { MemPool* tuple_data_pool() { return _tuple_data_pool.get(); } + ObjectPool* agg_object_pool() { + return _agg_object_pool.get(); + } int num_io_buffers() const { return _io_buffers.size(); } @@ -322,6 +325,7 @@ class RowBatch : public RowBatchInterface { // Transfer ownership of resources to dest. This includes tuple data in mem // pool and io buffers. + // we firstly update dest resource, and then reset current resource void transfer_resource_ownership(RowBatch* dest); void copy_row(TupleRow* src, TupleRow* dest) { @@ -384,14 +388,6 @@ class RowBatch : public RowBatchInterface { int num_buffers() const { return _buffers.size(); } - // Swaps all of the row batch state with 'other'. This is used for scan nodes - // which produce RowBatches asynchronously. Typically, an ExecNode is handed - // a row batch to populate (pull model) but ScanNodes have multiple threads - // which push row batches. This function is used to swap the pushed row batch - // contents with the row batch that's passed from the caller. - // TODO: this is wasteful and makes a copy that's unnecessary. Think about cleaning - // this up. - void swap(RowBatch* other); const RowDescriptor& row_desc() const { return _row_desc; @@ -484,6 +480,9 @@ class RowBatch : public RowBatchInterface { // holding (some of the) data referenced by rows boost::scoped_ptr _tuple_data_pool; + // holding some complex agg object data (bitmap, hll) + std::unique_ptr _agg_object_pool; + // IO buffers current owned by this row batch. Ownership of IO buffers transfer // between row batches. Any IO buffer will be owned by at most one row batch // (i.e. they are not ref counted) so most row batches don't own any. diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp index 4b49c3caded134..aba14cda1e2f45 100755 --- a/be/src/udf/udf.cpp +++ b/be/src/udf/udf.cpp @@ -465,9 +465,19 @@ void HllVal::init(FunctionContext* ctx) { is_null = false; } -void HllVal::agg_parse_and_cal(const HllVal &other) { +void HllVal::agg_parse_and_cal(FunctionContext* ctx, const HllVal& other) { doris::HllSetResolver resolver; - resolver.init((char*)other.ptr, other.len); + + // zero size means the src input is a HyperLogLog object + if (other.len == 0) { + auto* hll = reinterpret_cast(other.ptr); + uint8_t* other_ptr = ctx->allocate(doris::HLL_COLUMN_DEFAULT_LEN); + int other_len = hll->serialize(ptr); + resolver.init((char*)other_ptr, other_len); + } else { + resolver.init((char*)other.ptr, other.len); + } + resolver.parse(); if (resolver.get_hll_data_type() == doris::HLL_DATA_EMPTY) { diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 192de668e77cc3..4cf5d9bbf5eaf1 100755 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -777,7 +777,7 @@ struct HllVal : public StringVal { void init(FunctionContext* ctx); - void agg_parse_and_cal(const HllVal &other); + void agg_parse_and_cal(FunctionContext* ctx, const HllVal& other); void agg_merge(const HllVal &other); }; diff --git a/be/src/util/arena.h b/be/src/util/arena.h index 4a260e91ffc8f2..ef042eb768ff99 100644 --- a/be/src/util/arena.h +++ b/be/src/util/arena.h @@ -34,6 +34,11 @@ class Arena { return memory_usage_.load(std::memory_order_relaxed); } + // For the object wasn't allocated from Arena, but need to + // collect and control the object memory usage. + void track_memory(size_t bytes) { + memory_usage_.store(MemoryUsage() + bytes,std::memory_order_relaxed); + } private: char* AllocateFallback(size_t bytes); char* AllocateNewBlock(size_t block_bytes); diff --git a/be/test/olap/aggregate_func_test.cpp b/be/test/olap/aggregate_func_test.cpp index 7d4ca56d743aa4..21f2cc6575a96f 100644 --- a/be/test/olap/aggregate_func_test.cpp +++ b/be/test/olap/aggregate_func_test.cpp @@ -19,6 +19,7 @@ #include +#include "common/object_pool.h" #include "olap/decimal12.h" #include "olap/uint24.h" @@ -37,6 +38,7 @@ void test_min() { char buf[64]; Arena arena; + ObjectPool agg_object_pool; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MIN, field_type); RowCursorCell dst(buf); @@ -44,7 +46,7 @@ void test_min() { { char val_buf[16]; *(bool*)val_buf = true; - agg->init(&dst, val_buf, true, &arena); + agg->init(&dst, val_buf, true, &arena, &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -110,6 +112,7 @@ void test_max() { char buf[64]; Arena arena; + ObjectPool agg_object_pool; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MAX, field_type); RowCursorCell dst(buf); @@ -117,7 +120,7 @@ void test_max() { { char val_buf[16]; *(bool*)val_buf = true; - agg->init(&dst, val_buf, true, &arena); + agg->init(&dst, val_buf, true, &arena, &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -183,13 +186,14 @@ void test_sum() { RowCursorCell dst(buf); Arena arena; + ObjectPool agg_object_pool; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_SUM, field_type); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->init(&dst, val_buf, true, &arena); + agg->init(&dst, val_buf, true, &arena, &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -255,13 +259,14 @@ void test_replace() { RowCursorCell dst(buf); Arena arena; + ObjectPool agg_object_pool; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type); // null { char val_buf[16]; *(bool*)val_buf = true; - agg->init(&dst, val_buf, true, &arena); + agg->init(&dst, val_buf, true, &arena, &agg_object_pool); ASSERT_TRUE(*(bool*)(buf)); } // 100 @@ -312,6 +317,7 @@ void test_replace_string() { dst_slice->size = 0; Arena arena; + ObjectPool agg_object_pool; const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type); char src[string_field_size]; @@ -320,7 +326,7 @@ void test_replace_string() { // null { src_cell.set_null(); - agg->init(&dst_cell, (const char*)src_slice, true, &arena); + agg->init(&dst_cell, (const char*)src_slice, true, &arena, &agg_object_pool); ASSERT_TRUE(dst_cell.is_null()); } // "12345" diff --git a/be/test/olap/row_cursor_test.cpp b/be/test/olap/row_cursor_test.cpp index ab538b4c4871bd..cd26618432d391 100644 --- a/be/test/olap/row_cursor_test.cpp +++ b/be/test/olap/row_cursor_test.cpp @@ -17,6 +17,7 @@ #include +#include "common/object_pool.h" #include "olap/row_cursor.h" #include "olap/tablet_schema.h" #include "olap/row.h" @@ -470,7 +471,8 @@ TEST_F(TestRowCursor, AggregateWithoutNull) { left.set_field_content(4, reinterpret_cast(&l_decimal), _mem_pool.get()); left.set_field_content(5, reinterpret_cast(&l_varchar), _mem_pool.get()); - init_row_with_others(&row, left, arena.get()); + ObjectPool agg_object_pool; + init_row_with_others(&row, left, arena.get(), &agg_object_pool); RowCursor right; res = right.init(tablet_schema); @@ -528,7 +530,8 @@ TEST_F(TestRowCursor, AggregateWithNull) { left.set_null(4); left.set_field_content(5, reinterpret_cast(&l_varchar), _mem_pool.get()); - init_row_with_others(&row, left, arena.get()); + ObjectPool agg_object_pool; + init_row_with_others(&row, left, arena.get(), &agg_object_pool); RowCursor right; res = right.init(tablet_schema);