-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Avoid SerDe for aggregation query with object pool #1854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| auto* hll = new HyperLogLog((const uint8_t*) src_slice->data); | ||
| dst_slice->data = reinterpret_cast<char*>(hll); | ||
|
|
||
| arena->track_memory(sizeof(HyperLogLog)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sizeof(HyperLogLog) does not reflect the real size of HLL, and even has great difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you.
But this should be another issue, This PR don't change this point.
We need a better way to estimate the HLL and Bitmap memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
be/src/runtime/row_batch.cpp
Outdated
| std::swap(_has_in_flight_row, other->_has_in_flight_row); | ||
| std::swap(_num_rows, other->_num_rows); | ||
| std::swap(_capacity, other->_capacity); | ||
| _agg_object_pool.swap(other->_agg_object_pool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This swap function seems not be used, you can delete this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
be/src/runtime/row_batch.cpp
Outdated
| void RowBatch::transfer_resource_ownership(RowBatch* dest) { | ||
| dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes(); | ||
| dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false); | ||
| dest->_agg_object_pool.swap(_agg_object_pool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics is a transfer, which is not a swap logic.
If dest is empty this is right, if not this will be wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has fixed
| find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); | ||
| scan_node->set_scan_ranges(scan_ranges); | ||
| VLOG(1) << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.size(); | ||
| scan_node->set_is_agg_node_child(has_agg_node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not works for SCAN1 if the plan tree looks like
JOIN
|
----------
| |
SCAN1 AGG
|
SCAN2
There are other problems.
For example, if there are some HLL operation happen between agg and scan, it may cause problem.
Why not do like this?
- ScanNode alway return object representation
- All Functions or type handler can handle two representation.
If we do like that, we will not depend how plan tree is like.
| row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); | ||
| while (_heap.size() > 0) { | ||
| init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get()); | ||
| init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get(), agg_object_pool.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset object pool like arena?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| uint32_t row_checksum = 0; | ||
| while (true) { | ||
| OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), &eof); | ||
| OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), agg_object_pool.get(), &eof); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset object pool like arena
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
be/src/olap/row_cursor.h
Outdated
| size_t _variable_len; | ||
|
|
||
| // for agg query, we don't need to finalize when scan agg object data | ||
| bool need_agg_finalize = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good option to put this flag here? This struct only a row, it should not care how to assemble itself
be/src/olap/reader.cpp
Outdated
| _merged_rows += merged_count; | ||
| agg_finalize_row(_value_cids, row_cursor, arena); | ||
| // For agg query, we don't need finalize agg object and directly pass agg object to agg node | ||
| if (row_cursor->is_need_agg_finalize()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to put flag in ReaderParams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| int64_t output_rows = 0; | ||
| while (true) { | ||
| Arena arena; | ||
| ObjectPool objectPool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, that's bad. we will allocate a new arena and object pool for each row we read. This is very bad for performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix this issue in another PR.
d1a9144 to
c0c283b
Compare
|
I suddenly remembered that you need to modify HLL related functions in be/src/exprs/aggregate_functions.cpp. Otherwise, it will cause BE Crash during the upgrade process. |
Thanks for your reminder. update. |
be/src/udf/udf.cpp
Outdated
| // zero size means the src input is a HyperLogLog object | ||
| if (other.len == 0) { | ||
| auto* hll = reinterpret_cast<doris::HyperLogLog*>(other.ptr); | ||
| uint8_t* ptr = ctx->allocate(doris::HLL_COLUMN_DEFAULT_LEN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there is a private member variable named ptr and len, better to rename this ptr to other_ptr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
imay
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
morningman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
For #1610 "avoid serialize and deserialize from Scannode to Aggnode".
For aggregation query, we directly pass agg object from scan node to Agg node and avoid unnecessary serialize and deserialize.