From b01a03edba6b06d288185865fb577b42f91ca7d5 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Thu, 9 May 2024 20:44:47 +0800 Subject: [PATCH 01/13] Test to repro --- cpp/src/arrow/acero/asof_join_node_test.cc | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index d95d2aaad36..6bb5d109d21 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1678,5 +1678,34 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { /*slow_r0=*/false); } +TEST(AsofJoinTest, Flaky) { + std::vector left_types = {int64(), utf8()}; + auto left_batch = ExecBatchFromJSON( + left_types, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); + std::vector right_types = {int64(), utf8(), float64()}; + auto right_batch = + ExecBatchFromJSON(right_types, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])"); + + Declaration left{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", utf8())}), + {std::move(left_batch)})}; + Declaration right{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", utf8()), + field("colC", float64())}), + {std::move(right_batch)})}; + AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1); + Declaration asof_join{"asofjoin", {left, right}, asof_join_opts}; + + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(asof_join)); + + std::vector exp_types = {int64(), utf8(), float64()}; + auto exp_batch = ExecBatchFromJSON( + exp_types, + R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])"); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); +} + } // namespace acero } // namespace arrow From 223951e031012e948027ce8d26f1ed3bb37cacf5 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 10 May 2024 13:53:17 +0800 Subject: [PATCH 02/13] Fix --- cpp/src/arrow/acero/asof_join_node.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 48cc83dd3d6..b75896c38f4 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1394,7 +1394,10 @@ class AsofJoinNode : public ExecNode { DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl), rb->ToString(), DEBUG_MANIP(std::endl)); - ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); + { + std::lock_guard guard(gate_); + ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); + } process_.Push(true); return Status::OK(); } From a499690e9ba9aec67a13e58454550894fcac6140 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 10 May 2024 16:54:55 +0800 Subject: [PATCH 03/13] Another repro --- cpp/src/arrow/acero/asof_join_node_test.cc | 27 +++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 6bb5d109d21..176e03477af 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1678,7 +1678,8 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { /*slow_r0=*/false); } -TEST(AsofJoinTest, Flaky) { +// GH-40675. +TEST(AsofJoinTest, Flaky1) { std::vector left_types = {int64(), utf8()}; auto left_batch = ExecBatchFromJSON( left_types, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); @@ -1707,5 +1708,29 @@ TEST(AsofJoinTest, Flaky) { AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); } +// GH-41149. +TEST(AsofJoinTest, Flaky2) { + std::vector left_types = {int64()}; + auto left_batch = ExecBatchFromJSON(left_types, R"([[1], [2], [3]])"); + std::vector right_types = {utf8(), int64()}; + auto right_batch = ExecBatchFromJSON(right_types, R"([["Z", 2], ["B", 3], ["A", 4]])"); + + Declaration left{"exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("on", int64())}), + {std::move(left_batch)})}; + Declaration right{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}), + {std::move(right_batch)})}; + AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1); + Declaration asof_join{"asofjoin", {left, right}, asof_join_opts}; + + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(asof_join)); + + std::vector exp_types = {int64(), utf8()}; + auto exp_batch = ExecBatchFromJSON(exp_types, R"([[1, "Z"], [2, "Z"], [3, "B"]])"); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); +} + } // namespace acero } // namespace arrow From ade727e68488aad1f34ea635800b1b579c5fe17a Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 10 May 2024 23:56:21 +0800 Subject: [PATCH 04/13] Use double locking --- cpp/src/arrow/acero/asof_join_node.cc | 34 +++++++++++++++++---------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index b75896c38f4..e5ba6e4f53b 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -963,15 +963,22 @@ class AsofJoinNode : public ExecNode { // If LHS is finished or empty then there's nothing we can do here if (lhs.Finished() || lhs.Empty()) break; - // Advance each of the RHS as far as possible to be up to date for the LHS timestamp - ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs()); - - // If we have received enough inputs to produce the next output batch - // (decided by IsUpToDateWithLhsRow), we will perform the join and - // materialize the output batch. The join is done by advancing through - // the LHS and adding joined row to rows_ (done by Emplace). Finally, - // input batches that are no longer needed are removed to free up memory. - if (IsUpToDateWithLhsRow()) { + bool any_rhs_advanced{}; + bool is_up_to_date_with_lhs_row{}; + { + std::lock_guard guard(push_gate_); + // Advance each of the RHS as far as possible to be up to date for the LHS + // timestamp + ARROW_ASSIGN_OR_RAISE(any_rhs_advanced, UpdateRhs()); + + // If we have received enough inputs to produce the next output batch + // (decided by IsUpToDateWithLhsRow), we will perform the join and + // materialize the output batch. The join is done by advancing through + // the LHS and adding joined row to rows_ (done by Emplace). Finally, + // input batches that are no longer needed are removed to free up memory. + is_up_to_date_with_lhs_row = IsUpToDateWithLhsRow(); + } + if (is_up_to_date_with_lhs_row) { dst.Emplace(state_, tolerance_); ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance()); if (!advanced) break; // if we can't advance LHS, we're done for this batch @@ -1032,7 +1039,7 @@ class AsofJoinNode : public ExecNode { } bool Process() { - std::lock_guard guard(gate_); + std::lock_guard guard(finish_gate_); if (!CheckEnded()) { return false; } @@ -1395,7 +1402,7 @@ class AsofJoinNode : public ExecNode { rb->ToString(), DEBUG_MANIP(std::endl)); { - std::lock_guard guard(gate_); + std::lock_guard guard(push_gate_); ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); } process_.Push(true); @@ -1404,7 +1411,7 @@ class AsofJoinNode : public ExecNode { Status InputFinished(ExecNode* input, int total_batches) override { { - std::lock_guard guard(gate_); + std::lock_guard guard(finish_gate_); ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); state_.at(k)->set_total_batches(total_batches); @@ -1458,7 +1465,8 @@ class AsofJoinNode : public ExecNode { // InputStates // Each input state corresponds to an input table std::vector> state_; - std::mutex gate_; + std::mutex finish_gate_; + std::mutex push_gate_; TolType tolerance_; #ifndef NDEBUG std::ostream* debug_os_; From a4a3b1d79455ca7d6a87d8452760e95e510a1425 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Sat, 11 May 2024 01:23:16 +0800 Subject: [PATCH 05/13] Lockless solution --- cpp/src/arrow/acero/asof_join_node.cc | 91 ++++++++++------------ cpp/src/arrow/acero/asof_join_node_test.cc | 38 ++++----- 2 files changed, 60 insertions(+), 69 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index e5ba6e4f53b..0b2775843f8 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -548,8 +548,10 @@ class InputState { // true when the queue is empty and, when memo may have future entries (the case of a // positive tolerance), when the memo is empty. // used when checking whether RHS is up to date with LHS. - bool CurrentEmpty() const { - return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty(); + // NOTE: The emptiness must be decided by an atomic all to Empty() in caller, due to the + // potential race with Push(), see GH-41614. + bool CurrentEmpty(bool empty) const { + return memo_.no_future_ ? empty : memo_.times_.empty() && empty; } // in case memo may not have future entries (the case of a non-positive tolerance), @@ -650,13 +652,15 @@ class InputState { // timestamp, update latest_time and latest_ref_row to the value that immediately pass // the horizon. Update the memo-store with any entries or future entries so observed. // Returns true if updates were made, false if not. - Result AdvanceAndMemoize(OnType ts) { + // NOTE: The emptiness must be decided by an atomic all to Empty() in caller, due to the + // potential race with Push(), see GH-41614. + Result AdvanceAndMemoize(OnType ts, bool empty) { // Advance the right side row index until we reach the latest right row (for each key) // for the given left timestamp. DEBUG_SYNC(node_, "Advancing input ", index_, DEBUG_MANIP(std::endl)); // Check if already updated for TS (or if there is no latest) - if (Empty()) { // can't advance if empty and no future entries + if (empty) { // can't advance if empty and no future entries return memo_.no_future_ ? false : memo_.RemoveEntriesWithLesserTime(ts); } @@ -918,34 +922,34 @@ class CompositeTableBuilder { // guaranteeing this probability is below 1 in a billion. The fix is 128-bit hashing. // See ARROW-17653 class AsofJoinNode : public ExecNode { - // Advances the RHS as far as possible to be up to date for the current LHS timestamp - Result UpdateRhs() { + // Advances the RHS as far as possible to be up to date for the current LHS timestamp. + // Returns a pair of booleans. The first is if any RHS has advanced. The second is if + // all RHS are up to date for LHS. + Result> UpdateRhsAndCheckUpToDateWithLhs() { auto& lhs = *state_.at(0); auto lhs_latest_time = lhs.GetLatestTime(); - bool any_updated = false; - for (size_t i = 1; i < state_.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(bool advanced, state_[i]->AdvanceAndMemoize(lhs_latest_time)); - any_updated |= advanced; - } - return any_updated; - } - - // Returns false if RHS not up to date for LHS - bool IsUpToDateWithLhsRow() const { - auto& lhs = *state_[0]; - if (lhs.Empty()) return false; // can't proceed if nothing on the LHS - OnType lhs_ts = lhs.GetLatestTime(); + bool any_advanced = false; + bool up_to_date_with_lhs = true; for (size_t i = 1; i < state_.size(); ++i) { auto& rhs = *state_[i]; - if (!rhs.Finished()) { + bool rhs_empty = rhs.Empty(); + + ARROW_ASSIGN_OR_RAISE(bool advanced, + rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty)); + any_advanced |= advanced; + + if (up_to_date_with_lhs && !rhs.Finished()) { // If RHS is finished, then we know it's up to date - if (rhs.CurrentEmpty()) - return false; // RHS isn't finished, but is empty --> not up to date - if (lhs_ts > rhs.GetCurrentTime()) - return false; // RHS isn't up to date (and not finished) + if (rhs.CurrentEmpty(rhs_empty)) { + // RHS isn't finished, but is empty --> not up to date + up_to_date_with_lhs = false; + } else if (lhs_latest_time > rhs.GetCurrentTime()) { + // RHS isn't up to date (and not finished) + up_to_date_with_lhs = false; + } } } - return true; + return std::make_pair(any_advanced, up_to_date_with_lhs); } Result> ProcessInner() { @@ -963,22 +967,17 @@ class AsofJoinNode : public ExecNode { // If LHS is finished or empty then there's nothing we can do here if (lhs.Finished() || lhs.Empty()) break; + ARROW_ASSIGN_OR_RAISE(auto pair, UpdateRhsAndCheckUpToDateWithLhs()); bool any_rhs_advanced{}; - bool is_up_to_date_with_lhs_row{}; - { - std::lock_guard guard(push_gate_); - // Advance each of the RHS as far as possible to be up to date for the LHS - // timestamp - ARROW_ASSIGN_OR_RAISE(any_rhs_advanced, UpdateRhs()); - - // If we have received enough inputs to produce the next output batch - // (decided by IsUpToDateWithLhsRow), we will perform the join and - // materialize the output batch. The join is done by advancing through - // the LHS and adding joined row to rows_ (done by Emplace). Finally, - // input batches that are no longer needed are removed to free up memory. - is_up_to_date_with_lhs_row = IsUpToDateWithLhsRow(); - } - if (is_up_to_date_with_lhs_row) { + bool rhs_up_to_date_with_lhs{}; + std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs) = pair; + + // If we have received enough inputs to produce the next output batch + // (decided by IsUpToDateWithLhsRow), we will perform the join and + // materialize the output batch. The join is done by advancing through + // the LHS and adding joined row to rows_ (done by Emplace). Finally, + // input batches that are no longer needed are removed to free up memory. + if (rhs_up_to_date_with_lhs) { dst.Emplace(state_, tolerance_); ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance()); if (!advanced) break; // if we can't advance LHS, we're done for this batch @@ -1039,7 +1038,7 @@ class AsofJoinNode : public ExecNode { } bool Process() { - std::lock_guard guard(finish_gate_); + std::lock_guard guard(gate_); if (!CheckEnded()) { return false; } @@ -1401,17 +1400,14 @@ class AsofJoinNode : public ExecNode { DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl), rb->ToString(), DEBUG_MANIP(std::endl)); - { - std::lock_guard guard(push_gate_); - ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); - } + ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); process_.Push(true); return Status::OK(); } Status InputFinished(ExecNode* input, int total_batches) override { { - std::lock_guard guard(finish_gate_); + std::lock_guard guard(gate_); ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); state_.at(k)->set_total_batches(total_batches); @@ -1465,8 +1461,7 @@ class AsofJoinNode : public ExecNode { // InputStates // Each input state corresponds to an input table std::vector> state_; - std::mutex finish_gate_; - std::mutex push_gate_; + std::mutex gate_; TolType tolerance_; #ifndef NDEBUG std::ostream* debug_os_; diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 176e03477af..71606d372b0 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1678,14 +1678,11 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { /*slow_r0=*/false); } -// GH-40675. -TEST(AsofJoinTest, Flaky1) { - std::vector left_types = {int64(), utf8()}; +TEST(AsofJoinTest, GH40675) { auto left_batch = ExecBatchFromJSON( - left_types, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); - std::vector right_types = {int64(), utf8(), float64()}; - auto right_batch = - ExecBatchFromJSON(right_types, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])"); + {int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); + auto right_batch = ExecBatchFromJSON( + {int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])"); Declaration left{ "exec_batch_source", @@ -1697,23 +1694,21 @@ TEST(AsofJoinTest, Flaky1) { field("colC", float64())}), {std::move(right_batch)})}; AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1); - Declaration asof_join{"asofjoin", {left, right}, asof_join_opts}; + Declaration asof_join{ + "asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)}; - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(asof_join)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join))); - std::vector exp_types = {int64(), utf8(), float64()}; auto exp_batch = ExecBatchFromJSON( - exp_types, + {int64(), utf8(), float64()}, R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])"); AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); } -// GH-41149. -TEST(AsofJoinTest, Flaky2) { - std::vector left_types = {int64()}; - auto left_batch = ExecBatchFromJSON(left_types, R"([[1], [2], [3]])"); - std::vector right_types = {utf8(), int64()}; - auto right_batch = ExecBatchFromJSON(right_types, R"([["Z", 2], ["B", 3], ["A", 4]])"); +TEST(AsofJoinTest, GH41149) { + auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])"); + auto right_batch = + ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])"); Declaration left{"exec_batch_source", ExecBatchSourceNodeOptions(schema({field("on", int64())}), @@ -1723,12 +1718,13 @@ TEST(AsofJoinTest, Flaky2) { ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}), {std::move(right_batch)})}; AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1); - Declaration asof_join{"asofjoin", {left, right}, asof_join_opts}; + Declaration asof_join{ + "asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)}; - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(asof_join)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join))); - std::vector exp_types = {int64(), utf8()}; - auto exp_batch = ExecBatchFromJSON(exp_types, R"([[1, "Z"], [2, "Z"], [3, "B"]])"); + auto exp_batch = + ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])"); AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); } From 6427ce201372f33ade3a5faf6ca52dc7684c80a1 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Sat, 11 May 2024 01:57:04 +0800 Subject: [PATCH 06/13] Fix --- cpp/src/arrow/acero/asof_join_node.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 0b2775843f8..c8b733cb584 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -932,7 +932,12 @@ class AsofJoinNode : public ExecNode { bool up_to_date_with_lhs = true; for (size_t i = 1; i < state_.size(); ++i) { auto& rhs = *state_[i]; + + // Obtain RHS emptiness once for subsequent AdvanceAndMemoize() and CurrentEmpty(). bool rhs_empty = rhs.Empty(); + // Obtain RHS current time here because AdvanceAndMemoize() can change the + // emptiness. + OnType rhs_current_time = rhs_empty ? OnType{} : rhs.GetLatestTime(); ARROW_ASSIGN_OR_RAISE(bool advanced, rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty)); @@ -943,7 +948,7 @@ class AsofJoinNode : public ExecNode { if (rhs.CurrentEmpty(rhs_empty)) { // RHS isn't finished, but is empty --> not up to date up_to_date_with_lhs = false; - } else if (lhs_latest_time > rhs.GetCurrentTime()) { + } else if (lhs_latest_time > rhs_current_time) { // RHS isn't up to date (and not finished) up_to_date_with_lhs = false; } From 6bedcb2255b6d5b0bf6ba06718147f3a5af6d30a Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Sat, 11 May 2024 02:22:55 +0800 Subject: [PATCH 07/13] Refine --- cpp/src/arrow/acero/asof_join_node.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index c8b733cb584..bff230cfac1 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -972,10 +972,9 @@ class AsofJoinNode : public ExecNode { // If LHS is finished or empty then there's nothing we can do here if (lhs.Finished() || lhs.Empty()) break; - ARROW_ASSIGN_OR_RAISE(auto pair, UpdateRhsAndCheckUpToDateWithLhs()); bool any_rhs_advanced{}; bool rhs_up_to_date_with_lhs{}; - std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs) = pair; + ARROW_ASSIGN_OR_RAISE(std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs), UpdateRhsAndCheckUpToDateWithLhs()); // If we have received enough inputs to produce the next output batch // (decided by IsUpToDateWithLhsRow), we will perform the join and From 606f48c52d49dae3198b4dd93bf431c6916d9115 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Sat, 11 May 2024 02:25:35 +0800 Subject: [PATCH 08/13] Format --- cpp/src/arrow/acero/asof_join_node.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index bff230cfac1..90362a49ab3 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -974,7 +974,8 @@ class AsofJoinNode : public ExecNode { bool any_rhs_advanced{}; bool rhs_up_to_date_with_lhs{}; - ARROW_ASSIGN_OR_RAISE(std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs), UpdateRhsAndCheckUpToDateWithLhs()); + ARROW_ASSIGN_OR_RAISE(std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs), + UpdateRhsAndCheckUpToDateWithLhs()); // If we have received enough inputs to produce the next output batch // (decided by IsUpToDateWithLhsRow), we will perform the join and From 532d5e546f6ac2ad4e97185d407c5fc43ab438a1 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 13 May 2024 22:53:54 +0800 Subject: [PATCH 09/13] Fix comment about "atomic call" --- cpp/src/arrow/acero/asof_join_node.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 90362a49ab3..0867159dfc9 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -548,8 +548,8 @@ class InputState { // true when the queue is empty and, when memo may have future entries (the case of a // positive tolerance), when the memo is empty. // used when checking whether RHS is up to date with LHS. - // NOTE: The emptiness must be decided by an atomic all to Empty() in caller, due to the - // potential race with Push(), see GH-41614. + // NOTE: The emptiness must be decided by an single call to Empty() in caller, due to + // the potential race with Push(), see GH-41614. bool CurrentEmpty(bool empty) const { return memo_.no_future_ ? empty : memo_.times_.empty() && empty; } @@ -652,8 +652,8 @@ class InputState { // timestamp, update latest_time and latest_ref_row to the value that immediately pass // the horizon. Update the memo-store with any entries or future entries so observed. // Returns true if updates were made, false if not. - // NOTE: The emptiness must be decided by an atomic all to Empty() in caller, due to the - // potential race with Push(), see GH-41614. + // NOTE: The emptiness must be decided by an single call to Empty() in caller, due to + // the potential race with Push(), see GH-41614. Result AdvanceAndMemoize(OnType ts, bool empty) { // Advance the right side row index until we reach the latest right row (for each key) // for the given left timestamp. From 7d158754fda2e3034edd49fdac9ed6e4b30a16fd Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 13 May 2024 22:55:54 +0800 Subject: [PATCH 10/13] Parentheses to disambiguate operator precedence --- cpp/src/arrow/acero/asof_join_node.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 0867159dfc9..77245a36531 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -551,7 +551,7 @@ class InputState { // NOTE: The emptiness must be decided by an single call to Empty() in caller, due to // the potential race with Push(), see GH-41614. bool CurrentEmpty(bool empty) const { - return memo_.no_future_ ? empty : memo_.times_.empty() && empty; + return memo_.no_future_ ? empty : (memo_.times_.empty() && empty); } // in case memo may not have future entries (the case of a non-positive tolerance), From 104024eb7470dae117699b66dda7bbfa75caf5b5 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 13 May 2024 23:53:12 +0800 Subject: [PATCH 11/13] Use meaningful struct instead of std::pair --- cpp/src/arrow/acero/asof_join_node.cc | 38 +++++++++++++++------------ 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 77245a36531..1745c9e1b4f 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -922,14 +922,21 @@ class CompositeTableBuilder { // guaranteeing this probability is below 1 in a billion. The fix is 128-bit hashing. // See ARROW-17653 class AsofJoinNode : public ExecNode { - // Advances the RHS as far as possible to be up to date for the current LHS timestamp. - // Returns a pair of booleans. The first is if any RHS has advanced. The second is if - // all RHS are up to date for LHS. - Result> UpdateRhsAndCheckUpToDateWithLhs() { + // A simple wrapper for the result of a single call to UpdateRhs(), identifying: + // 1) If any RHS has advanced. + // 2) If all RHS are up to date with LHS. + struct RhsUpdateState { + bool any_advanced; + bool all_up_to_date_with_lhs; + }; + // Advances the RHS as far as possible to be up to date for the current LHS timestamp, + // and checks if all RHS are up to date with LHS. The reason they have to be performed + // together is that they both depend on the emptiness of the RHS, which can be changed + // by Push() executing in another thread. + Result UpdateRhs() { auto& lhs = *state_.at(0); auto lhs_latest_time = lhs.GetLatestTime(); - bool any_advanced = false; - bool up_to_date_with_lhs = true; + RhsUpdateState update_state{/*any_advanced=*/false, /*all_up_to_date_with_lhs=*/true}; for (size_t i = 1; i < state_.size(); ++i) { auto& rhs = *state_[i]; @@ -941,20 +948,20 @@ class AsofJoinNode : public ExecNode { ARROW_ASSIGN_OR_RAISE(bool advanced, rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty)); - any_advanced |= advanced; + update_state.any_advanced |= advanced; - if (up_to_date_with_lhs && !rhs.Finished()) { + if (update_state.all_up_to_date_with_lhs && !rhs.Finished()) { // If RHS is finished, then we know it's up to date if (rhs.CurrentEmpty(rhs_empty)) { // RHS isn't finished, but is empty --> not up to date - up_to_date_with_lhs = false; + update_state.all_up_to_date_with_lhs = false; } else if (lhs_latest_time > rhs_current_time) { // RHS isn't up to date (and not finished) - up_to_date_with_lhs = false; + update_state.all_up_to_date_with_lhs = false; } } } - return std::make_pair(any_advanced, up_to_date_with_lhs); + return update_state; } Result> ProcessInner() { @@ -972,22 +979,19 @@ class AsofJoinNode : public ExecNode { // If LHS is finished or empty then there's nothing we can do here if (lhs.Finished() || lhs.Empty()) break; - bool any_rhs_advanced{}; - bool rhs_up_to_date_with_lhs{}; - ARROW_ASSIGN_OR_RAISE(std::tie(any_rhs_advanced, rhs_up_to_date_with_lhs), - UpdateRhsAndCheckUpToDateWithLhs()); + ARROW_ASSIGN_OR_RAISE(auto rhs_update_state, UpdateRhs()); // If we have received enough inputs to produce the next output batch // (decided by IsUpToDateWithLhsRow), we will perform the join and // materialize the output batch. The join is done by advancing through // the LHS and adding joined row to rows_ (done by Emplace). Finally, // input batches that are no longer needed are removed to free up memory. - if (rhs_up_to_date_with_lhs) { + if (rhs_update_state.all_up_to_date_with_lhs) { dst.Emplace(state_, tolerance_); ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance()); if (!advanced) break; // if we can't advance LHS, we're done for this batch } else { - if (!any_rhs_advanced) break; // need to wait for new data + if (!rhs_update_state.any_advanced) break; // need to wait for new data } } From 843216e780296e9ef6f2bfc5031e34b9b6ada3d0 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 14 May 2024 00:13:37 +0800 Subject: [PATCH 12/13] More comments on test cases --- cpp/src/arrow/acero/asof_join_node_test.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 71606d372b0..051e280a4c5 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1678,7 +1678,9 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { /*slow_r0=*/false); } -TEST(AsofJoinTest, GH40675) { +// Reproduction of GH-40675: A logical race between Process() and Push() that can be more +// easily observed with single small batch. +TEST(AsofJoinTest, RhsEmptinessRace) { auto left_batch = ExecBatchFromJSON( {int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); auto right_batch = ExecBatchFromJSON( @@ -1705,7 +1707,9 @@ TEST(AsofJoinTest, GH40675) { AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); } -TEST(AsofJoinTest, GH41149) { +// Reproduction of GH-41149: Another case of the same root cause as GH-40675, but with +// empty "by" columns. +TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) { auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])"); auto right_batch = ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])"); From 1b4b21ad21d3346ab30beb849ed0f0631d65ea43 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 14 May 2024 22:21:28 +0800 Subject: [PATCH 13/13] Fix typo --- cpp/src/arrow/acero/asof_join_node.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 1745c9e1b4f..1d94467df9e 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -548,8 +548,8 @@ class InputState { // true when the queue is empty and, when memo may have future entries (the case of a // positive tolerance), when the memo is empty. // used when checking whether RHS is up to date with LHS. - // NOTE: The emptiness must be decided by an single call to Empty() in caller, due to - // the potential race with Push(), see GH-41614. + // NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the + // potential race with Push(), see GH-41614. bool CurrentEmpty(bool empty) const { return memo_.no_future_ ? empty : (memo_.times_.empty() && empty); } @@ -652,8 +652,8 @@ class InputState { // timestamp, update latest_time and latest_ref_row to the value that immediately pass // the horizon. Update the memo-store with any entries or future entries so observed. // Returns true if updates were made, false if not. - // NOTE: The emptiness must be decided by an single call to Empty() in caller, due to - // the potential race with Push(), see GH-41614. + // NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the + // potential race with Push(), see GH-41614. Result AdvanceAndMemoize(OnType ts, bool empty) { // Advance the right side row index until we reach the latest right row (for each key) // for the given left timestamp.