Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class ObjectPool {
_objects.clear();
}

// Absorb all objects from src pool
// Note: This method is not thread safe
void acquire_data(ObjectPool* src) {
_objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end());
src->_objects.clear();
}

private:
struct GenericElement {
virtual ~GenericElement() {}
Expand Down
19 changes: 19 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,25 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
}

void ExecNode::try_do_aggregate_serde_improve() {
std::vector<ExecNode*> agg_node;
collect_nodes(TPlanNodeType::AGGREGATION_NODE, &agg_node);
if (agg_node.size() != 1) {
return;
}

if (agg_node[0]->_children.size() != 1) {
return;
}

if (agg_node[0]->_children[0]->type() != TPlanNodeType::OLAP_SCAN_NODE) {
return;
}

OlapScanNode* scan_node = static_cast<OlapScanNode*>(agg_node[0]->_children[0]);
scan_node->set_no_agg_finalize();
}

void ExecNode::init_runtime_profile(const std::string& name) {
std::stringstream ss;
ss << name << " (id=" << _id << ")";
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ class ExecNode {
// Collect all scan node types.
void collect_scan_nodes(std::vector<ExecNode*>* nodes);

// When the agg node is the scan node direct parent,
// we directly return agg object from scan node to agg node,
// and don't serialize the agg object.
// This improve is cautious, we ensure the correctness firstly.
void try_do_aggregate_serde_improve();

typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
// Evaluate exprs over row. Returns true if all exprs return true.
// TODO: This doesn't use the vector<Expr*> signature because I haven't figured
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ Status OlapScanNode::prepare(RuntimeState* state) {
_rows_pushed_cond_filtered_counter =
ADD_COUNTER(_runtime_profile, "RowsPushedCondFiltered", TUnit::UNIT);
_init_counter(state);

_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == NULL) {
// TODO: make sure we print all available diagnostic output to our error log
Expand Down Expand Up @@ -696,7 +695,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
scanner_ranges.push_back((*ranges)[i].get());
}
OlapScanner* scanner = new OlapScanner(
state, this, _olap_scan_node.is_preaggregation, *scan_range, scanner_ranges);
state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges);
_scanner_pool->add(scanner);
_olap_scanners.push_back(scanner);
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class OlapScanNode : public ScanNode {
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges);

inline void set_no_agg_finalize() {
_need_agg_finalize = false;
}
protected:
typedef struct {
Tuple* tuple;
Expand Down Expand Up @@ -242,6 +244,8 @@ class OlapScanNode : public ScanNode {
int64_t _running_thread;
EvalConjunctsFn _eval_conjuncts_fn;

bool _need_agg_finalize = true;

// Counters
RuntimeProfile::Counter* _io_timer = nullptr;
RuntimeProfile::Counter* _read_compressed_counter = nullptr;
Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ OlapScanner::OlapScanner(
RuntimeState* runtime_state,
OlapScanNode* parent,
bool aggregation,
bool need_agg_finalize,
const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges)
: _runtime_state(runtime_state),
Expand All @@ -52,6 +53,7 @@ OlapScanner::OlapScanner(
_string_slots(parent->_string_slots),
_is_open(false),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
_tuple_idx(parent->_tuple_idx),
_direct_conjunct_size(parent->_direct_conjunct_size) {
_reader.reset(new Reader());
Expand Down Expand Up @@ -213,6 +215,11 @@ Status OlapScanner::_init_params(
}
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());

// If a agg node is this scan node direct parent
// we will not call agg object finalize method in scan node,
// to avoid the unnecessary SerDe and improve query performance
_params.need_agg_finalize = _need_agg_finalize;

return Status::OK();
}

Expand Down Expand Up @@ -264,7 +271,7 @@ Status OlapScanner::get_batch(
break;
}
// Read one row from reader
auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof);
auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), batch->agg_object_pool(), eof);
if (res != OLAP_SUCCESS) {
return Status::InternalError("Internal Error: read storage fail.");
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class OlapScanner {
RuntimeState* runtime_state,
OlapScanNode* parent,
bool aggregation,
bool need_agg_finalize,
const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges);

Expand Down Expand Up @@ -107,6 +108,7 @@ class OlapScanner {
int _id;
bool _is_open;
bool _aggregation;
bool _need_agg_finalize = true;
bool _has_update_counter = false;

Status _ctor_status;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/aggregate_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx,
}
DCHECK(!dst->is_null);

dst->agg_parse_and_cal(src);
dst->agg_parse_and_cal(ctx, src);
return ;
}

Expand Down
19 changes: 14 additions & 5 deletions be/src/exprs/bitmap_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,24 @@ BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal
}

void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr);
auto* dst_bitmap = reinterpret_cast<RoaringBitmap*>(dst->ptr);
dst_bitmap->merge(src_bitmap);
// zero size means the src input is a agg object
if (src.len == 0) {
dst_bitmap->merge(*reinterpret_cast<RoaringBitmap*>(src.ptr));
} else {
dst_bitmap->merge(RoaringBitmap((char*)src.ptr));
}
}

BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) {
RoaringBitmap bitmap ((char*)src.ptr);
BigIntVal result(bitmap.cardinality());
return result;
// zero size means the src input is a agg object
if (src.len == 0) {
auto bitmap = reinterpret_cast<RoaringBitmap*>(src.ptr);
return {bitmap->cardinality()};
} else {
RoaringBitmap bitmap ((char*)src.ptr);
return {bitmap.cardinality()};
}
}

StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) {
Expand Down
9 changes: 7 additions & 2 deletions be/src/exprs/hll_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) {
dst_hll->update(hash_value);
}
}

void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) {
HyperLogLog src_hll((uint8_t*)src.ptr);
auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
dst_hll->merge(src_hll);
// zero size means the src input is a agg object
if (src.len == 0) {
dst_hll->merge(*reinterpret_cast<HyperLogLog*>(src.ptr));
} else {
dst_hll->merge(HyperLogLog(src.ptr));
}
}

BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) {
Expand Down
53 changes: 29 additions & 24 deletions be/src/olap/aggregate_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "common/object_pool.h"
#include "olap/hll.h"
#include "olap/types.h"
#include "olap/row_cursor_cell.h"
Expand All @@ -28,7 +29,7 @@

namespace doris {

using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
using AggUpdateFunc = void (*)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
using AggFinalizeFunc = void (*)(RowCursorCell* src, Arena* arena);

Expand All @@ -43,8 +44,8 @@ class AggregateInfo {
// Memory Note: For plain memory can be allocated from arena, whose lifetime
// will last util finalize function is called. Memory allocated from heap should
// be freed in finalize functioin to avoid memory leak.
inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const {
_init_fn(dst, src, src_null, arena);
inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const {
_init_fn(dst, src, src_null, arena, agg_pool);
}

// Update aggregated intermediate data. Data stored in engine is aggregated.
Expand Down Expand Up @@ -73,7 +74,7 @@ class AggregateInfo {
FieldAggregationMethod agg_method() const { return _agg_method; }

private:
void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
void (*_update_fn)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
void (*_finalize_fn)(RowCursorCell* src, Arena* arena);

Expand All @@ -87,7 +88,7 @@ class AggregateInfo {

template<FieldType field_type>
struct BaseAggregateFuncs {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
Expand All @@ -113,7 +114,7 @@ struct AggregateFuncTraits : public BaseAggregateFuncs<field_type> {
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
Expand All @@ -129,7 +130,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL>
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
Expand All @@ -144,7 +145,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATE> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
Expand Down Expand Up @@ -398,17 +399,21 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_REPLACE, OLAP_FIELD_TYPE_CHAR>
// so when init, update hll, the src is not null
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL> {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
DCHECK_EQ(src_null, false);
dst->set_not_null();

auto* src_slice = reinterpret_cast<const Slice*>(src);
auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());

dst_slice->size = sizeof(HyperLogLog);
// use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage.
char* mem = arena->Allocate(dst_slice->size);
dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data);
// we use zero size represent this slice is a agg object
dst_slice->size = 0;
auto* hll = new HyperLogLog((const uint8_t*) src_slice->data);
dst_slice->data = reinterpret_cast<char*>(hll);

arena->track_memory(sizeof(HyperLogLog));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sizeof(HyperLogLog) does not reflect the real size of HLL, and even has great difference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you.
But this should be another issue, This PR don't change this point.
We need a better way to estimate the HLL and Bitmap memory usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


agg_pool->add(hll);
}

static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
Expand All @@ -425,33 +430,36 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
} else { // for stream load
auto* src_hll = reinterpret_cast<HyperLogLog*>(src_slice->data);
dst_hll->merge(*src_hll);
// NOT use 'delete src_hll' because the memory is managed by arena
src_hll->~HyperLogLog();
}
}

// The HLL object memory will be released by ObjectPool
static void finalize(RowCursorCell* src, Arena* arena) {
auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
auto *hll = reinterpret_cast<HyperLogLog*>(slice->data);

slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN);
slice->size = hll->serialize((uint8_t*)slice->data);
// NOT using 'delete hll' because the memory is managed by arena
hll->~HyperLogLog();
}
};
// when data load, after bitmap_init fucntion, bitmap_union column won't be null
// so when init, update bitmap, the src is not null
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_VARCHAR> {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
DCHECK_EQ(src_null, false);
dst->set_not_null();
auto* src_slice = reinterpret_cast<const Slice*>(src);
auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());

dst_slice->size = sizeof(RoaringBitmap);
dst_slice->data = (char*)new RoaringBitmap(src_slice->data);
// we use zero size represent this slice is a agg object
dst_slice->size = 0;
auto* bitmap = new RoaringBitmap(src_slice->data);
dst_slice->data = (char*) bitmap;

arena->track_memory(sizeof(RoaringBitmap));

agg_pool->add(bitmap);
}

static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
Expand All @@ -468,20 +476,17 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
} else { // for stream load
auto* src_bitmap = reinterpret_cast<RoaringBitmap*>(src_slice->data);
dst_bitmap->merge(*src_bitmap);

delete src_bitmap;
}
}

// The RoaringBitmap object memory will be released by ObjectPool
static void finalize(RowCursorCell* src, Arena *arena) {
auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
auto *bitmap = reinterpret_cast<RoaringBitmap*>(slice->data);

slice->size = bitmap->size();
slice->data = arena->Allocate(slice->size);
bitmap->serialize(slice->data);

delete bitmap;
}
};

Expand Down
Loading