-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-45334: [C++][Acero] Fix swiss join overflow issues in row offset calculation for fixed length and null masks #45336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e15d80b
b752669
06bcc5e
2cdd4c2
7f0ea14
f2f3535
9b1e908
c004237
18d8188
ba24a03
22d6b1e
8897753
ff4202b
5e7f863
c3b0ee7
b93af5b
5c0c857
efe3c98
53dc951
8e97d6b
c4d3959
d4c2af3
65c4003
8355c68
f7df7a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -477,14 +477,15 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so | |
| const int64_t* source_rows_permutation) { | ||
| int64_t num_source_rows = source.length(); | ||
|
|
||
| int64_t fixed_length = target->metadata().fixed_length; | ||
| uint32_t fixed_length = target->metadata().fixed_length; | ||
|
|
||
| // Permutation of source rows is optional. Without permutation all that is | ||
| // needed is memcpy. | ||
| // | ||
| if (!source_rows_permutation) { | ||
| memcpy(target->mutable_data(1) + fixed_length * first_target_row_id, source.data(1), | ||
| fixed_length * num_source_rows); | ||
| DCHECK_LE(first_target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| memcpy(target->mutable_fixed_length_rows(static_cast<uint32_t>(first_target_row_id)), | ||
| source.fixed_length_rows(/*row_id=*/0), fixed_length * num_source_rows); | ||
| } else { | ||
| // Row length must be a multiple of 64-bits due to enforced alignment. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated, but that's an interesting piece of info. Doesn't it blow up memory use if there is a small number of very small columns?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is true. But IMHO this is acceptable because we also have other auxiliary data structures to aim the hash join so I wouldn't say this is very bad. |
||
| // Loop for each output row copying a fixed number of 64-bit words. | ||
|
|
@@ -494,10 +495,13 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so | |
| int64_t num_words_per_row = fixed_length / sizeof(uint64_t); | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| DCHECK_LE(source_row_id, std::numeric_limits<uint32_t>::max()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If that's always the case then are we wasting memory and CPU cache by having a 64-bit permutation array?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are. In fact I have another WIP branch trying to clean them up. |
||
| const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>( | ||
| source.data(1) + fixed_length * source_row_id); | ||
| source.fixed_length_rows(static_cast<uint32_t>(source_row_id))); | ||
| int64_t target_row_id = first_target_row_id + i; | ||
| DCHECK_LE(target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| uint64_t* target_row_ptr = reinterpret_cast<uint64_t*>( | ||
| target->mutable_data(1) + fixed_length * (first_target_row_id + i)); | ||
| target->mutable_fixed_length_rows(static_cast<uint32_t>(target_row_id))); | ||
|
|
||
| for (int64_t word = 0; word < num_words_per_row; ++word) { | ||
| target_row_ptr[word] = source_row_ptr[word]; | ||
|
|
@@ -529,16 +533,16 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl& | |
|
|
||
| // We can simply memcpy bytes of rows if their order has not changed. | ||
| // | ||
| memcpy(target->mutable_data(2) + target_offsets[first_target_row_id], source.data(2), | ||
| source_offsets[num_source_rows] - source_offsets[0]); | ||
| memcpy(target->mutable_var_length_rows() + target_offsets[first_target_row_id], | ||
| source.var_length_rows(), source_offsets[num_source_rows] - source_offsets[0]); | ||
| } else { | ||
| int64_t target_row_offset = first_target_row_offset; | ||
| uint64_t* target_row_ptr = | ||
| reinterpret_cast<uint64_t*>(target->mutable_data(2) + target_row_offset); | ||
| uint64_t* target_row_ptr = reinterpret_cast<uint64_t*>( | ||
| target->mutable_var_length_rows() + target_row_offset); | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>( | ||
| source.data(2) + source_offsets[source_row_id]); | ||
| source.var_length_rows() + source_offsets[source_row_id]); | ||
| int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id]; | ||
| // Though the row offset is 64-bit, the length of a single row must be 32-bit as | ||
| // required by current row table implementation. | ||
|
|
@@ -564,14 +568,18 @@ void RowArrayMerge::CopyNulls(RowTableImpl* target, const RowTableImpl& source, | |
| const int64_t* source_rows_permutation) { | ||
| int64_t num_source_rows = source.length(); | ||
| int num_bytes_per_row = target->metadata().null_masks_bytes_per_row; | ||
| uint8_t* target_nulls = target->null_masks() + num_bytes_per_row * first_target_row_id; | ||
| DCHECK_LE(first_target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| uint8_t* target_nulls = | ||
| target->mutable_null_masks(static_cast<uint32_t>(first_target_row_id)); | ||
| if (!source_rows_permutation) { | ||
| memcpy(target_nulls, source.null_masks(), num_bytes_per_row * num_source_rows); | ||
| memcpy(target_nulls, source.null_masks(/*row_id=*/0), | ||
| num_bytes_per_row * num_source_rows); | ||
| } else { | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| for (uint32_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| DCHECK_LE(source_row_id, std::numeric_limits<uint32_t>::max()); | ||
| const uint8_t* source_nulls = | ||
| source.null_masks() + num_bytes_per_row * source_row_id; | ||
| source.null_masks(static_cast<uint32_t>(source_row_id)); | ||
| for (int64_t byte = 0; byte < num_bytes_per_row; ++byte) { | ||
| *target_nulls++ = *source_nulls++; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were these size constraints already implied but not tested for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We assume row id to be
uint32_t(that means no more than 2^32 rows are allowed) almost everywhere, so this is implied. But there are still some places weirdly and unnecessarily usingint64_tas row id, here included.