From 8bb1ee90804b70b3a907804aae812bf675b785d7 Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Wed, 13 Sep 2023 22:15:44 +0200 Subject: [PATCH 1/7] Solve issue #37655 [PR 35087](https://github.com/apache/arrow/pull/35087) introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by [issue 34474](https://github.com/apache/arrow/issues/34474)). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB. This PR modifies the source code so that the logical test only verifies whether the total size of _key variable_ is below 4 GB. --- cpp/src/arrow/acero/swiss_join.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 3f11b89af39..89d04786793 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -440,7 +440,8 @@ void RowArray::DebugPrintToFile(const char* filename, bool print_sorted) const { Status RowArrayMerge::PrepareForMerge(RowArray* target, const std::vector& sources, std::vector* first_target_row_id, - MemoryPool* pool) { + MemoryPool* pool, + bool check_key_size) { ARROW_DCHECK(!sources.empty()); ARROW_DCHECK(sources[0]->is_initialized_); @@ -473,7 +474,7 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target, (*first_target_row_id)[sources.size()] = num_rows; } - if (num_bytes > std::numeric_limits::max()) { + if (check_key_size && num_bytes > std::numeric_limits::max()) { return Status::Invalid( "There are more than 2^32 bytes of key data. Acero cannot " "process a join of this magnitude"); @@ -1331,7 +1332,7 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { partition_keys[i] = prtn_states_[i].keys.keys(); } RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, - &partition_keys_first_row_id_, pool_)); + &partition_keys_first_row_id_, pool_, true)); // 2. SwissTable: // @@ -1354,7 +1355,7 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { } RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(&target_->payloads_, partition_payloads, &partition_payloads_first_row_id_, - pool_)); + pool_, false)); } // Check if we have duplicate keys From 80cd210a9572b746ab66e813bdbd2a0a5eadac43 Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Thu, 5 Oct 2023 15:21:15 +0200 Subject: [PATCH 2/7] Add the new argument to swiss_join_internal.h --- cpp/src/arrow/acero/swiss_join_internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 88b80f06f57..5508603fb2f 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -161,7 +161,7 @@ class RowArrayMerge { // static Status PrepareForMerge(RowArray* target, const std::vector& sources, std::vector* first_target_row_id, - MemoryPool* pool); + MemoryPool* pool, bool check_key_size); // Copy rows from source array to target array. // Both arrays must have the same row metadata. From 608778a67b63ac0408c33535d06505c392d368bd Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Tue, 10 Oct 2023 15:47:12 +0200 Subject: [PATCH 3/7] A code line should not be more than 90 characters long --- cpp/src/arrow/acero/swiss_join.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 89d04786793..c25c682c794 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1332,7 +1332,8 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { partition_keys[i] = prtn_states_[i].keys.keys(); } RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, - &partition_keys_first_row_id_, pool_, true)); + &partition_keys_first_row_id_, pool_, + true)); // 2. SwissTable: // From a851b067d9a8e667bb848290fa7cc6164ee14a30 Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Thu, 12 Oct 2023 12:07:26 +0200 Subject: [PATCH 4/7] Update cpp/src/arrow/acero/swiss_join.cc Remove trailing whitespace --- cpp/src/arrow/acero/swiss_join.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index c25c682c794..fe386f160b0 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1332,7 +1332,7 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { partition_keys[i] = prtn_states_[i].keys.keys(); } RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, - &partition_keys_first_row_id_, pool_, + &partition_keys_first_row_id_, pool_, true)); // 2. SwissTable: From 9715485bb388745db60c519282f53f113a1b25f8 Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Thu, 12 Oct 2023 22:26:20 +0200 Subject: [PATCH 5/7] Change the order of the arguments and the name of the new argument --- cpp/src/arrow/acero/swiss_join_internal.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 5508603fb2f..38679de21bc 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -160,8 +160,9 @@ class RowArrayMerge { // caller can pass in nullptr to indicate that it is not needed. // static Status PrepareForMerge(RowArray* target, const std::vector& sources, + bool is_key_data, std::vector* first_target_row_id, - MemoryPool* pool, bool check_key_size); + MemoryPool* pool); // Copy rows from source array to target array. // Both arrays must have the same row metadata. From 0ebf92e2d009107317f68f77f12db4e725f94730 Mon Sep 17 00:00:00 2001 From: Olivier Meslin <44379737+oliviermeslin@users.noreply.github.com> Date: Thu, 12 Oct 2023 22:30:56 +0200 Subject: [PATCH 6/7] Adapt swiss_join.cc to the new Status --- cpp/src/arrow/acero/swiss_join.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index fe386f160b0..321ee0129b0 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -439,9 +439,9 @@ void RowArray::DebugPrintToFile(const char* filename, bool print_sorted) const { Status RowArrayMerge::PrepareForMerge(RowArray* target, const std::vector& sources, + bool is_key_data, std::vector* first_target_row_id, - MemoryPool* pool, - bool check_key_size) { + MemoryPool* pool) { ARROW_DCHECK(!sources.empty()); ARROW_DCHECK(sources[0]->is_initialized_); @@ -474,7 +474,7 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target, (*first_target_row_id)[sources.size()] = num_rows; } - if (check_key_size && num_bytes > std::numeric_limits::max()) { + if (is_key_data && num_bytes > std::numeric_limits::max()) { return Status::Invalid( "There are more than 2^32 bytes of key data. Acero cannot " "process a join of this magnitude"); @@ -1331,9 +1331,10 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { for (int i = 0; i < num_prtns_; ++i) { partition_keys[i] = prtn_states_[i].keys.keys(); } + RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, - &partition_keys_first_row_id_, pool_, - true)); + true, &partition_keys_first_row_id_, + pool_)); // 2. SwissTable: // @@ -1355,8 +1356,8 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { partition_payloads[i] = &prtn_states_[i].payloads; } RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(&target_->payloads_, partition_payloads, - &partition_payloads_first_row_id_, - pool_, false)); + false, &partition_payloads_first_row_id_, + pool_)); } // Check if we have duplicate keys From 04eb7cbc60e9cf5dc72a39f410a7ff636ece26ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Cumplido?= Date: Fri, 13 Oct 2023 18:30:35 +0200 Subject: [PATCH 7/7] Minor lint fix --- cpp/src/arrow/acero/swiss_join.cc | 5 ++--- cpp/src/arrow/acero/swiss_join_internal.h | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 321ee0129b0..6fc99409091 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1332,9 +1332,8 @@ Status SwissTableForJoinBuild::PreparePrtnMerge() { partition_keys[i] = prtn_states_[i].keys.keys(); } - RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, - true, &partition_keys_first_row_id_, - pool_)); + RETURN_NOT_OK(RowArrayMerge::PrepareForMerge(target_->map_.keys(), partition_keys, true, + &partition_keys_first_row_id_, pool_)); // 2. SwissTable: // diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 38679de21bc..50efaba9d9c 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -160,7 +160,7 @@ class RowArrayMerge { // caller can pass in nullptr to indicate that it is not needed. // static Status PrepareForMerge(RowArray* target, const std::vector& sources, - bool is_key_data, + bool is_key_data, std::vector* first_target_row_id, MemoryPool* pool);