From 7d40e92c7d96e34935b2af3f84f74d4e1d8cec49 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 2 Jun 2023 08:28:00 -0400 Subject: [PATCH 01/19] GH-35838: [C++] Backpressure broken in asof join node --- cpp/src/arrow/acero/asof_join_node.cc | 7 +- cpp/src/arrow/acero/asof_join_node_test.cc | 103 +++++++++++++++++++-- 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index b92339b951b..5bdf402db9c 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -668,18 +668,19 @@ class InputState { static Result> Make( size_t index, TolType tolerance, bool must_hash, bool may_rehash, - KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output, + KeyHasher* key_hasher, ExecNode* input, AsofJoinNode* node, std::atomic& backpressure_counter, const std::shared_ptr& schema, const col_index_t time_col_index, const std::vector& key_col_index) { constexpr size_t low_threshold = 4, high_threshold = 8; std::unique_ptr backpressure_control = - std::make_unique(node, output, backpressure_counter); + std::make_unique(/*node=*/input, /*output=*/node, + backpressure_counter); ARROW_ASSIGN_OR_RAISE(auto handler, BackpressureHandler::Make(low_threshold, high_threshold, std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, - key_hasher, output, std::move(handler), schema, + key_hasher, node, std::move(handler), schema, time_col_index, key_col_index); } diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index b113eb86e56..001db3ebdf6 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -31,6 +31,7 @@ #ifndef NDEBUG #include "arrow/acero/options_internal.h" #endif +#include "arrow/acero/map_node.h" #include "arrow/acero/test_nodes.h" #include "arrow/acero/test_util_internal.h" #include "arrow/acero/util.h" @@ -1381,18 +1382,88 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); - Declaration l_src = { - "source", SourceNodeOptions( - l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))}; - Declaration r0_src = { - "source", SourceNodeOptions( - r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))}; - Declaration r1_src = { - "source", SourceNodeOptions( - r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))}; + struct BackpressureCounters { + int32_t pause_count = 0; + int32_t resume_count = 0; + }; + + struct BackpressureTestNodeOptions : public ExecNodeOptions { + BackpressureCounters* counters; + }; + + struct BackpressureTestNode : public MapNode { + BackpressureTestNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + const BackpressureTestNodeOptions& options) + : MapNode(plan, inputs, output_schema), counters(options.counters) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "BackpressureTestNode")); + auto bp_options = static_cast(options); + return plan->EmplaceNode( + plan, inputs, inputs[0]->output_schema(), bp_options); + } + + const char* kind_name() const override { return "BackpressureTestNode"; } + Result ProcessBatch(ExecBatch batch) override { return batch; } + + void PauseProducing(ExecNode* output, int32_t counter) override { + ++counters->pause_count; + inputs()[0]->PauseProducing(this, counter); + } + void ResumeProducing(ExecNode* output, int32_t counter) override { + ++counters->resume_count; + inputs()[0]->ResumeProducing(this, counter); + } + + BackpressureCounters* counters; + }; + + auto exec_reg = default_exec_factory_registry(); + std::string bp_test = "backpressure_test"; + if (!exec_reg->GetFactory(bp_test).ok()) { + ASSERT_OK(exec_reg->AddFactory(bp_test, BackpressureTestNode::Make)); + } + + struct SourceConfig { + std::string name_prefix; + bool is_fast; + std::shared_ptr schema; + decltype(l_batches) batches; + + std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } + }; + + // must have at least one fast and one slow + std::vector source_configs = { + {"0", true, l_schema, l_batches}, + {"1", false, r0_schema, r0_batches}, + {"2", true, r1_schema, r1_batches}, + }; + + std::vector bp_counters(source_configs.size()); + std::vector src_decls; + std::vector> bp_options; + std::vector bp_decls; + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& config = source_configs[i]; + src_decls.emplace_back( + "source", SourceNodeOptions( + config.schema, + MakeDelayedGen(config.batches, config.name(), + config.is_fast ? fast_delay : slow_delay, noisy))); + bp_options.push_back(std::make_shared()); + bp_options.back()->counters = &bp_counters[i]; + std::shared_ptr options = bp_options.back(); + std::vector bp_in = {src_decls.back()}; + Declaration bp_decl = {bp_test, bp_in, std::move(options)}; + bp_decls.push_back(bp_decl); + } Declaration asofjoin = { - "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)}; + "asofjoin", bp_decls, + GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, DeclarationToReader(asofjoin, /*use_threads=*/false)); @@ -1406,6 +1477,18 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, total_length += batch->num_rows(); } ASSERT_EQ(static_cast(num_batches * batch_size), total_length); + + std::unordered_map counters_by_is_fast; + for (size_t i = 0; i < source_configs.size(); i++) { + BackpressureCounters& counters = counters_by_is_fast[source_configs[i].is_fast]; + counters.pause_count += bp_counters[i].pause_count; + counters.resume_count += bp_counters[i].resume_count; + } + ASSERT_EQ(counters_by_is_fast.size(), 2); + ASSERT_EQ(counters_by_is_fast[false].pause_count, 0); + ASSERT_EQ(counters_by_is_fast[false].resume_count, 0); + ASSERT_GT(counters_by_is_fast[true].pause_count, 0); + ASSERT_GT(counters_by_is_fast[true].resume_count, 0); } TEST(AsofJoinTest, BackpressureWithBatches) { From 3e5fdf9a4a7e48500882f1592d1949c772dca358 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 2 Jun 2023 09:00:10 -0400 Subject: [PATCH 02/19] improve parameter names, attempt to fix flaky test --- cpp/src/arrow/acero/asof_join_node.cc | 8 ++++---- cpp/src/arrow/acero/asof_join_node_test.cc | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 5bdf402db9c..6928c823e02 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -668,19 +668,19 @@ class InputState { static Result> Make( size_t index, TolType tolerance, bool must_hash, bool may_rehash, - KeyHasher* key_hasher, ExecNode* input, AsofJoinNode* node, + KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node, std::atomic& backpressure_counter, const std::shared_ptr& schema, const col_index_t time_col_index, const std::vector& key_col_index) { constexpr size_t low_threshold = 4, high_threshold = 8; std::unique_ptr backpressure_control = - std::make_unique(/*node=*/input, /*output=*/node, - backpressure_counter); + std::make_unique( + /*node=*/asof_input, /*output=*/asof_node, backpressure_counter); ARROW_ASSIGN_OR_RAISE(auto handler, BackpressureHandler::Make(low_threshold, high_threshold, std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, - key_hasher, node, std::move(handler), schema, + key_hasher, asof_node, std::move(handler), schema, time_col_index, key_col_index); } diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 001db3ebdf6..c2e02c76967 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1485,10 +1485,12 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, counters.resume_count += bp_counters[i].resume_count; } ASSERT_EQ(counters_by_is_fast.size(), 2); - ASSERT_EQ(counters_by_is_fast[false].pause_count, 0); - ASSERT_EQ(counters_by_is_fast[false].resume_count, 0); ASSERT_GT(counters_by_is_fast[true].pause_count, 0); ASSERT_GT(counters_by_is_fast[true].resume_count, 0); + // runs on some slow machines may not see any pause/resume, but if at least one pause is + // seen then at least one resume must also be seen + ASSERT_EQ(counters_by_is_fast[false].pause_count > 0, + counters_by_is_fast[false].resume_count > 0); } TEST(AsofJoinTest, BackpressureWithBatches) { From 3b85e8d3ef32236b094768325d8e5773e09e19f8 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 2 Jun 2023 10:59:50 -0400 Subject: [PATCH 03/19] another attempt --- cpp/src/arrow/acero/asof_join_node_test.cc | 23 +++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index c2e02c76967..3cdea8ffcf0 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1388,14 +1388,17 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, }; struct BackpressureTestNodeOptions : public ExecNodeOptions { - BackpressureCounters* counters; + double initial_sleep_seconds = 0; + BackpressureCounters* counters = nullptr; }; struct BackpressureTestNode : public MapNode { BackpressureTestNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, const BackpressureTestNodeOptions& options) - : MapNode(plan, inputs, output_schema), counters(options.counters) {} + : MapNode(plan, inputs, output_schema), + counters(options.counters), + sleep_seconds(options.initial_sleep_seconds) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -1406,7 +1409,13 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } const char* kind_name() const override { return "BackpressureTestNode"; } - Result ProcessBatch(ExecBatch batch) override { return batch; } + Result ProcessBatch(ExecBatch batch) override { + if (sleep_seconds > 0) { + SleepFor(sleep_seconds); + sleep_seconds = 0; + } + return batch; + } void PauseProducing(ExecNode* output, int32_t counter) override { ++counters->pause_count; @@ -1418,6 +1427,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } BackpressureCounters* counters; + double sleep_seconds; }; auto exec_reg = default_exec_factory_registry(); @@ -1455,6 +1465,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, config.is_fast ? fast_delay : slow_delay, noisy))); bp_options.push_back(std::make_shared()); bp_options.back()->counters = &bp_counters[i]; + bp_options.back()->initial_sleep_seconds = config.is_fast ? 0 : 2; std::shared_ptr options = bp_options.back(); std::vector bp_in = {src_decls.back()}; Declaration bp_decl = {bp_test, bp_in, std::move(options)}; @@ -1485,12 +1496,10 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, counters.resume_count += bp_counters[i].resume_count; } ASSERT_EQ(counters_by_is_fast.size(), 2); + ASSERT_EQ(counters_by_is_fast[false].pause_count, 0); + ASSERT_EQ(counters_by_is_fast[false].resume_count, 0); ASSERT_GT(counters_by_is_fast[true].pause_count, 0); ASSERT_GT(counters_by_is_fast[true].resume_count, 0); - // runs on some slow machines may not see any pause/resume, but if at least one pause is - // seen then at least one resume must also be seen - ASSERT_EQ(counters_by_is_fast[false].pause_count > 0, - counters_by_is_fast[false].resume_count > 0); } TEST(AsofJoinTest, BackpressureWithBatches) { From 3a111e3b52176fd7a75f2425d60c8c828ed88861 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 2 Jun 2023 16:09:03 -0400 Subject: [PATCH 04/19] added delay node --- cpp/src/arrow/acero/asof_join_node_test.cc | 174 +++++++++++++-------- 1 file changed, 105 insertions(+), 69 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 3cdea8ffcf0..b8243545250 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1361,6 +1361,91 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, { schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())})); }) +struct BackpressureCounters { + int32_t pause_count = 0; + int32_t resume_count = 0; +}; + +struct BackpressureCountingNodeOptions : public ExecNodeOptions { + BackpressureCounters* counters; +}; + +struct BackpressureCountingNode : public MapNode { + static constexpr const char* kKindName = "BackpressureCountingNode"; + static constexpr const char* kFactoryName = "backpressure_count"; + + static void Register() { + auto exec_reg = default_exec_factory_registry(); + if (!exec_reg->GetFactory(kFactoryName).ok()) { + ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make)); + } + } + + BackpressureCountingNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + const BackpressureCountingNodeOptions& options) + : MapNode(plan, inputs, output_schema), counters(options.counters) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); + auto bp_options = static_cast(options); + return plan->EmplaceNode( + plan, inputs, inputs[0]->output_schema(), bp_options); + } + + const char* kind_name() const override { return kKindName; } + Result ProcessBatch(ExecBatch batch) override { return batch; } + + void PauseProducing(ExecNode* output, int32_t counter) override { + ++counters->pause_count; + inputs()[0]->PauseProducing(this, counter); + } + void ResumeProducing(ExecNode* output, int32_t counter) override { + ++counters->resume_count; + inputs()[0]->ResumeProducing(this, counter); + } + + BackpressureCounters* counters; +}; + +struct BackpressureDelayingNodeOptions : public ExecNodeOptions { + double delay_seconds; +}; + +struct BackpressureDelayingNode : public MapNode { + static constexpr auto kKindName = "BackpressureDelayingNode"; + static constexpr const char* kFactoryName = "backpressure_delay"; + + static void Register() { + auto exec_reg = default_exec_factory_registry(); + if (!exec_reg->GetFactory(kFactoryName).ok()) { + ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make)); + } + } + + BackpressureDelayingNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + const BackpressureDelayingNodeOptions& options) + : MapNode(plan, inputs, output_schema), delay_seconds(options.delay_seconds) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); + auto bp_options = static_cast(options); + return plan->EmplaceNode( + plan, inputs, inputs[0]->output_schema(), bp_options); + } + + const char* kind_name() const override { return kKindName; } + Result ProcessBatch(ExecBatch batch) override { + SleepFor(delay_seconds); + return batch; + } + + double delay_seconds; +}; + template void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, double fast_delay, double slow_delay, bool noisy = false) { @@ -1382,61 +1467,10 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); - struct BackpressureCounters { - int32_t pause_count = 0; - int32_t resume_count = 0; - }; - - struct BackpressureTestNodeOptions : public ExecNodeOptions { - double initial_sleep_seconds = 0; - BackpressureCounters* counters = nullptr; - }; - - struct BackpressureTestNode : public MapNode { - BackpressureTestNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, - const BackpressureTestNodeOptions& options) - : MapNode(plan, inputs, output_schema), - counters(options.counters), - sleep_seconds(options.initial_sleep_seconds) {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "BackpressureTestNode")); - auto bp_options = static_cast(options); - return plan->EmplaceNode( - plan, inputs, inputs[0]->output_schema(), bp_options); - } - - const char* kind_name() const override { return "BackpressureTestNode"; } - Result ProcessBatch(ExecBatch batch) override { - if (sleep_seconds > 0) { - SleepFor(sleep_seconds); - sleep_seconds = 0; - } - return batch; - } - - void PauseProducing(ExecNode* output, int32_t counter) override { - ++counters->pause_count; - inputs()[0]->PauseProducing(this, counter); - } - void ResumeProducing(ExecNode* output, int32_t counter) override { - ++counters->resume_count; - inputs()[0]->ResumeProducing(this, counter); - } + BackpressureCountingNode::Register(); + BackpressureDelayingNode::Register(); - BackpressureCounters* counters; - double sleep_seconds; - }; - - auto exec_reg = default_exec_factory_registry(); - std::string bp_test = "backpressure_test"; - if (!exec_reg->GetFactory(bp_test).ok()) { - ASSERT_OK(exec_reg->AddFactory(bp_test, BackpressureTestNode::Make)); - } - - struct SourceConfig { + struct BackpressureSourceConfig { std::string name_prefix; bool is_fast; std::shared_ptr schema; @@ -1446,7 +1480,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, }; // must have at least one fast and one slow - std::vector source_configs = { + std::vector source_configs = { {"0", true, l_schema, l_batches}, {"1", false, r0_schema, r0_batches}, {"2", true, r1_schema, r1_batches}, @@ -1454,7 +1488,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::vector bp_counters(source_configs.size()); std::vector src_decls; - std::vector> bp_options; + std::vector> bp_options; std::vector bp_decls; for (size_t i = 0; i < source_configs.size(); i++) { const auto& config = source_configs[i]; @@ -1463,12 +1497,12 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, config.schema, MakeDelayedGen(config.batches, config.name(), config.is_fast ? fast_delay : slow_delay, noisy))); - bp_options.push_back(std::make_shared()); + bp_options.push_back(std::make_shared()); bp_options.back()->counters = &bp_counters[i]; - bp_options.back()->initial_sleep_seconds = config.is_fast ? 0 : 2; std::shared_ptr options = bp_options.back(); std::vector bp_in = {src_decls.back()}; - Declaration bp_decl = {bp_test, bp_in, std::move(options)}; + Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, + std::move(options)}; bp_decls.push_back(bp_decl); } @@ -1476,8 +1510,13 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, "asofjoin", bp_decls, GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; + BackpressureDelayingNodeOptions delay_options; + delay_options.delay_seconds = slow_delay * 5; + Declaration delaying = { + BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options}; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, - DeclarationToReader(asofjoin, /*use_threads=*/false)); + DeclarationToReader(delaying, /*use_threads=*/false)); int64_t total_length = 0; for (;;) { @@ -1489,17 +1528,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } ASSERT_EQ(static_cast(num_batches * batch_size), total_length); - std::unordered_map counters_by_is_fast; + size_t total_pause_count = 0, total_resume_count = 0; + ; for (size_t i = 0; i < source_configs.size(); i++) { - BackpressureCounters& counters = counters_by_is_fast[source_configs[i].is_fast]; - counters.pause_count += bp_counters[i].pause_count; - counters.resume_count += bp_counters[i].resume_count; + if (bp_counters[i].pause_count > 0) total_pause_count++; + if (bp_counters[i].resume_count > 0) total_resume_count++; } - ASSERT_EQ(counters_by_is_fast.size(), 2); - ASSERT_EQ(counters_by_is_fast[false].pause_count, 0); - ASSERT_EQ(counters_by_is_fast[false].resume_count, 0); - ASSERT_GT(counters_by_is_fast[true].pause_count, 0); - ASSERT_GT(counters_by_is_fast[true].resume_count, 0); + ASSERT_GT(total_pause_count, 0); + ASSERT_GT(total_resume_count, 0); } TEST(AsofJoinTest, BackpressureWithBatches) { From 17397d473975737d5366a25e0a2adf410d2b2791 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 2 Jun 2023 16:34:32 -0400 Subject: [PATCH 05/19] lint --- cpp/src/arrow/acero/asof_join_node_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index b8243545250..aa88e8e2a32 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1529,7 +1529,6 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_EQ(static_cast(num_batches * batch_size), total_length); size_t total_pause_count = 0, total_resume_count = 0; - ; for (size_t i = 0; i < source_configs.size(); i++) { if (bp_counters[i].pause_count > 0) total_pause_count++; if (bp_counters[i].resume_count > 0) total_resume_count++; From a2bfd6a4b6f2065498fdd9f223ebb4751d6d04e6 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sat, 3 Jun 2023 04:07:25 -0400 Subject: [PATCH 06/19] add gate --- cpp/src/arrow/acero/asof_join_node_test.cc | 29 +++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index aa88e8e2a32..56a2bddfa47 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1367,6 +1367,8 @@ struct BackpressureCounters { }; struct BackpressureCountingNodeOptions : public ExecNodeOptions { + BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {} + BackpressureCounters* counters; }; @@ -1410,7 +1412,11 @@ struct BackpressureCountingNode : public MapNode { }; struct BackpressureDelayingNodeOptions : public ExecNodeOptions { + BackpressureDelayingNodeOptions(double delay_seconds, std::function gate) + : delay_seconds(delay_seconds), gate(gate) {} + double delay_seconds; + std::function gate; }; struct BackpressureDelayingNode : public MapNode { @@ -1427,7 +1433,9 @@ struct BackpressureDelayingNode : public MapNode { BackpressureDelayingNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, const BackpressureDelayingNodeOptions& options) - : MapNode(plan, inputs, output_schema), delay_seconds(options.delay_seconds) {} + : MapNode(plan, inputs, output_schema), + gate(options.gate), + delay_seconds(options.delay_seconds) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -1439,10 +1447,13 @@ struct BackpressureDelayingNode : public MapNode { const char* kind_name() const override { return kKindName; } Result ProcessBatch(ExecBatch batch) override { - SleepFor(delay_seconds); + while (!gate()) { + SleepFor(delay_seconds); + } return batch; } + std::function gate; double delay_seconds; }; @@ -1497,8 +1508,8 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, config.schema, MakeDelayedGen(config.batches, config.name(), config.is_fast ? fast_delay : slow_delay, noisy))); - bp_options.push_back(std::make_shared()); - bp_options.back()->counters = &bp_counters[i]; + bp_options.push_back( + std::make_shared(&bp_counters[i])); std::shared_ptr options = bp_options.back(); std::vector bp_in = {src_decls.back()}; Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, @@ -1510,8 +1521,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, "asofjoin", bp_decls, GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; - BackpressureDelayingNodeOptions delay_options; - delay_options.delay_seconds = slow_delay * 5; + BackpressureDelayingNodeOptions delay_options(slow_delay * 5, [&bp_counters]() { + for (const auto& counters : bp_counters) { + if (counters.pause_count > 0 || counters.resume_count > 0) { + return true; + } + } + return false; + }); Declaration delaying = { BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options}; From 8e2570ec2a7a675fd3152fa3ee3ae117da248da4 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 4 Jun 2023 06:21:12 -0400 Subject: [PATCH 07/19] another attempt --- cpp/src/arrow/acero/asof_join_node_test.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 56a2bddfa47..eeb261b7fdd 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1545,13 +1545,12 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } ASSERT_EQ(static_cast(num_batches * batch_size), total_length); - size_t total_pause_count = 0, total_resume_count = 0; - for (size_t i = 0; i < source_configs.size(); i++) { - if (bp_counters[i].pause_count > 0) total_pause_count++; - if (bp_counters[i].resume_count > 0) total_resume_count++; + size_t total_count = 0; + for (const auto& counters : bp_counters) { + total_count += counters.pause_count; + total_count += counters.resume_count; } - ASSERT_GT(total_pause_count, 0); - ASSERT_GT(total_resume_count, 0); + ASSERT_GT(total_count, 0); } TEST(AsofJoinTest, BackpressureWithBatches) { From 6db6574432d1c5f997b2496d5f7072a03dbcb946 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 4 Jun 2023 06:40:44 -0400 Subject: [PATCH 08/19] double number of batches --- cpp/src/arrow/acero/asof_join_node_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index eeb261b7fdd..1f8ad6cfbfe 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1554,7 +1554,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1, + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1, /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false); } @@ -1618,7 +1618,7 @@ T GetEnvValue(const std::string& var, T default_value) { } // namespace TEST(AsofJoinTest, BackpressureWithBatchesGen) { - int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10); + int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size, /*fast_delay=*/0.001, /*slow_delay=*/0.01); From 380aa94bd04df435a0be7c6ee05a5e2581008f77 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 5 Jun 2023 09:55:48 -0400 Subject: [PATCH 09/19] fix data races --- cpp/src/arrow/acero/asof_join_node.cc | 2 +- cpp/src/arrow/acero/asof_join_node_test.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 6928c823e02..cf2a8d616b8 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -529,7 +529,7 @@ class KeyHasher { size_t index_; std::vector indices_; std::vector metadata_; - const RecordBatch* batch_; + std::atomic batch_; std::vector hashes_; LightContext ctx_; std::vector column_arrays_; diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 1f8ad6cfbfe..5dbd78c06a2 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1362,8 +1362,8 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, { }) struct BackpressureCounters { - int32_t pause_count = 0; - int32_t resume_count = 0; + std::atomic pause_count = 0; + std::atomic resume_count = 0; }; struct BackpressureCountingNodeOptions : public ExecNodeOptions { From 6eae28fb98ddb7d983eea6a65d92dc88dfb12942 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sat, 10 Jun 2023 14:23:41 -0400 Subject: [PATCH 10/19] remove delay_seconds, fix key hasher invalidation --- cpp/src/arrow/acero/asof_join_node_test.cc | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 5dbd78c06a2..7af1379d5a2 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1412,10 +1412,8 @@ struct BackpressureCountingNode : public MapNode { }; struct BackpressureDelayingNodeOptions : public ExecNodeOptions { - BackpressureDelayingNodeOptions(double delay_seconds, std::function gate) - : delay_seconds(delay_seconds), gate(gate) {} + explicit BackpressureDelayingNodeOptions(std::function gate) : gate(gate) {} - double delay_seconds; std::function gate; }; @@ -1433,9 +1431,7 @@ struct BackpressureDelayingNode : public MapNode { BackpressureDelayingNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, const BackpressureDelayingNodeOptions& options) - : MapNode(plan, inputs, output_schema), - gate(options.gate), - delay_seconds(options.delay_seconds) {} + : MapNode(plan, inputs, output_schema), gate(options.gate) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -1448,13 +1444,12 @@ struct BackpressureDelayingNode : public MapNode { const char* kind_name() const override { return kKindName; } Result ProcessBatch(ExecBatch batch) override { while (!gate()) { - SleepFor(delay_seconds); + SleepABit(); } return batch; } std::function gate; - double delay_seconds; }; template @@ -1521,7 +1516,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, "asofjoin", bp_decls, GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; - BackpressureDelayingNodeOptions delay_options(slow_delay * 5, [&bp_counters]() { + BackpressureDelayingNodeOptions delay_options([&bp_counters]() { for (const auto& counters : bp_counters) { if (counters.pause_count > 0 || counters.resume_count > 0) { return true; From e29520b4974e9bbc4ef08687913ce0199ab08b8a Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Wed, 14 Jun 2023 06:20:49 -0400 Subject: [PATCH 11/19] simplify concurrency --- cpp/src/arrow/acero/asof_join_node.cc | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index cf2a8d616b8..d9c273a72ba 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -370,15 +370,10 @@ struct MemoStore { times_.swap(memo.times_); } - // Updates the current time to `ts` if it is less. A different thread may win the race - // to update the current time to more than `ts` but not to less. Returns whether the - // current time was changed from its value at the beginning of this invocation. + // Updates the current time to `ts` if it is less. Returns true if updated. bool UpdateTime(OnType ts) { - OnType prev_time = current_time_; - bool update = prev_time < ts; - while (prev_time < ts && !current_time_.compare_exchange_weak(prev_time, ts)) { - // intentionally empty - standard CAS loop - } + bool update = current_time_ < ts; + if (update) current_time_ = ts; return update; } @@ -822,8 +817,11 @@ class InputState { ++batches_processed_; latest_ref_row_ = 0; have_active_batch &= !queue_.TryPop(); - if (have_active_batch) + if (have_active_batch) { DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed + key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache + memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed + } } } return have_active_batch; @@ -899,8 +897,6 @@ class InputState { Status Push(const std::shared_ptr& rb) { if (rb->num_rows() > 0) { - key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache - memo_.UpdateTime(GetTime(rb.get(), 0)); // time changed - update in MemoStore queue_.Push(rb); // only after above updates - push batch for processing } else { ++batches_processed_; // don't enqueue empty batches, just record as processed From 0cf8c4724c101bbbf126db05544c9b651f9f288c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Jun 2023 05:58:12 -0700 Subject: [PATCH 12/19] Convert the delaying node to a gated node to demonstrate my original idea --- cpp/src/arrow/acero/asof_join_node_test.cc | 180 +++++++++++++++------ 1 file changed, 134 insertions(+), 46 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 7af1379d5a2..03d28aa2c51 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -19,9 +19,12 @@ #include #include +#include #include #include #include +#include "arrow/acero/exec_plan.h" +#include "arrow/testing/future_util.h" #ifndef NDEBUG #include #endif @@ -32,6 +35,7 @@ #include "arrow/acero/options_internal.h" #endif #include "arrow/acero/map_node.h" +#include "arrow/acero/query_context.h" #include "arrow/acero/test_nodes.h" #include "arrow/acero/test_util_internal.h" #include "arrow/acero/util.h" @@ -44,6 +48,7 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #define TRACED_TEST(t_class, t_name, t_body) \ TEST(t_class, t_name) { \ @@ -1411,47 +1416,135 @@ struct BackpressureCountingNode : public MapNode { BackpressureCounters* counters; }; -struct BackpressureDelayingNodeOptions : public ExecNodeOptions { - explicit BackpressureDelayingNodeOptions(std::function gate) : gate(gate) {} +class Gate { + public: + void ReleaseAllBatches() { + std::lock_guard lg(mutex_); + num_allowed_batches_ = -1; + NotifyAll(); + } + + void ReleaseOneBatch() { + std::lock_guard lg(mutex_); + DCHECK_GE(num_allowed_batches_, 0) + << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; + num_allowed_batches_++; + NotifyAll(); + } + + Future<> WaitForNextReleasedBatch() { + std::lock_guard lg(mutex_); + if (current_waiter_.is_valid()) { + return current_waiter_; + } + Future<> fut; + if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { + num_released_batches_++; + return Future<>::MakeFinished(); + } + + current_waiter_ = Future<>::Make(); + return current_waiter_; + } + + private: + void NotifyAll() { + if (current_waiter_.is_valid()) { + Future<> to_unlock = current_waiter_; + current_waiter_ = {}; + to_unlock.MarkFinished(); + } + } - std::function gate; + Future<> current_waiter_; + int num_released_batches_ = 0; + int num_allowed_batches_ = 0; + std::mutex mutex_; }; -struct BackpressureDelayingNode : public MapNode { +struct GatedNodeOptions : public ExecNodeOptions { + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + Gate* gate; +}; + +struct GatedNode : public ExecNode, public TracedNode { static constexpr auto kKindName = "BackpressureDelayingNode"; static constexpr const char* kFactoryName = "backpressure_delay"; static void Register() { auto exec_reg = default_exec_factory_registry(); if (!exec_reg->GetFactory(kFactoryName).ok()) { - ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make)); + ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); } } - BackpressureDelayingNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, - const BackpressureDelayingNodeOptions& options) - : MapNode(plan, inputs, output_schema), gate(options.gate) {} + GatedNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, const GatedNodeOptions& options) + : ExecNode(plan, inputs, {"input"}, output_schema), + TracedNode(this), + gate_(options.gate) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); - auto bp_options = static_cast(options); - return plan->EmplaceNode( - plan, inputs, inputs[0]->output_schema(), bp_options); + auto gated_node_opts = static_cast(options); + return plan->EmplaceNode(plan, inputs, inputs[0]->output_schema(), + gated_node_opts); } const char* kind_name() const override { return kKindName; } - Result ProcessBatch(ExecBatch batch) override { - while (!gate()) { - SleepABit(); + + const Ordering& ordering() const override { return inputs_[0]->ordering(); } + Status InputFinished(ExecNode* input, int total_batches) override { + return output_->InputFinished(this, total_batches); + } + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + if (maybe_unlocked.is_finished()) { + return output_->InputReceived(this, std::move(batch)); } - return batch; + + // Otherwise, we will wait for the gate to notify us and then check if we are + // ready to relese a batch again. + maybe_unlocked.AddCallback([this, input, batch](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this, input, batch] { return InputReceived(input, batch); }, + "GatedNode::ResumeAfterNotify"); + }); + return Status::OK(); } - std::function gate; + Gate* gate_; }; +AsyncGenerator> GetGen( + AsyncGenerator> gen) { + return gen; +} +AsyncGenerator> GetGen(BatchesWithSchema bws) { + return bws.gen(false, false); +} + template void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, double fast_delay, double slow_delay, bool noisy = false) { @@ -1474,7 +1567,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); BackpressureCountingNode::Register(); - BackpressureDelayingNode::Register(); + GatedNode::Register(); struct BackpressureSourceConfig { std::string name_prefix; @@ -1485,6 +1578,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } }; + Gate gate; + GatedNodeOptions gate_options(&gate); + // must have at least one fast and one slow std::vector source_configs = { {"0", true, l_schema, l_batches}, @@ -1498,11 +1594,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::vector bp_decls; for (size_t i = 0; i < source_configs.size(); i++) { const auto& config = source_configs[i]; - src_decls.emplace_back( - "source", SourceNodeOptions( - config.schema, - MakeDelayedGen(config.batches, config.name(), - config.is_fast ? fast_delay : slow_delay, noisy))); + + src_decls.emplace_back("source", + SourceNodeOptions(config.schema, GetGen(config.batches))); bp_options.push_back( std::make_shared(&bp_counters[i])); std::shared_ptr options = bp_options.back(); @@ -1516,36 +1610,30 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, "asofjoin", bp_decls, GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; - BackpressureDelayingNodeOptions delay_options([&bp_counters]() { + ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, + internal::ThreadPool::Make(1)); + ExecContext exec_ctx(default_memory_pool(), tpool.get()); + Future batches_fut = + DeclarationToExecBatchesAsync(asofjoin, exec_ctx); + + BusyWait(10.0, [&] { + int total_paused = 0; for (const auto& counters : bp_counters) { - if (counters.pause_count > 0 || counters.resume_count > 0) { - return true; - } + total_paused += counters.pause_count; } - return false; + // One of the inputs is gated. The other two will eventually be paused by the asof + // join node + return total_paused >= 2; }); - Declaration delaying = { - BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options}; - - ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, - DeclarationToReader(delaying, /*use_threads=*/false)); - int64_t total_length = 0; - for (;;) { - ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next()); - if (!batch) { - break; - } - total_length += batch->num_rows(); - } - ASSERT_EQ(static_cast(num_batches * batch_size), total_length); + gate.ReleaseAllBatches(); + ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - size_t total_count = 0; + size_t total_resumed = 0; for (const auto& counters : bp_counters) { - total_count += counters.pause_count; - total_count += counters.resume_count; + total_resumed += counters.resume_count; } - ASSERT_GT(total_count, 0); + ASSERT_GE(total_resumed, 2); } TEST(AsofJoinTest, BackpressureWithBatches) { From 1edc44638175cee358333032e68aa27fdeb783b7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 14 Jun 2023 06:34:20 -0700 Subject: [PATCH 13/19] Fix the test to actually use the gated node. Fix the gated node so that it delivers batches in order. --- cpp/src/arrow/acero/asof_join_node_test.cc | 88 ++++++++++++++-------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 03d28aa2c51..e264bf81511 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1513,28 +1513,50 @@ struct GatedNode : public ExecNode, public TracedNode { Status StopProducingImpl() override { return Status::OK(); } + Status SendBatchesUnlocked(std::unique_lock&& lock) { + while (!queued_batches_.empty()) { + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + bool callback_added = maybe_unlocked.TryAddCallback([this] { + return [this](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this] { + std::unique_lock lk(mutex_); + return SendBatchesUnlocked(std::move(lk)); + }, + "GatedNode::ResumeAfterNotify"); + }; + }); + if (callback_added) { + break; + } + // Otherwise, the future is already finished which means the gate is unlocked + // and we are allowed to send a batch + ExecBatch next = std::move(queued_batches_.front()); + queued_batches_.pop(); + lock.unlock(); + ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); + lock.lock(); + } + return Status::OK(); + } + Status InputReceived(ExecNode* input, ExecBatch batch) override { auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); - // If we are ready to release the batch, do so immediately. - Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); - if (maybe_unlocked.is_finished()) { - return output_->InputReceived(this, std::move(batch)); - } + // This may be called concurrently by the source and by a restart attempt. Process + // one at a time (this critical section should be pretty small) + std::unique_lock lk(mutex_); + queued_batches_.push(std::move(batch)); - // Otherwise, we will wait for the gate to notify us and then check if we are - // ready to relese a batch again. - maybe_unlocked.AddCallback([this, input, batch](const Status& st) { - DCHECK_OK(st); - plan_->query_context()->ScheduleTask( - [this, input, batch] { return InputReceived(input, batch); }, - "GatedNode::ResumeAfterNotify"); - }); - return Status::OK(); + return SendBatchesUnlocked(std::move(lk)); } Gate* gate_; + std::queue queued_batches_; + std::mutex mutex_; }; AsyncGenerator> GetGen( @@ -1546,8 +1568,7 @@ AsyncGenerator> GetGen(BatchesWithSchema bws) { } template -void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, - double fast_delay, double slow_delay, bool noisy = false) { +void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { auto l_schema = schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); auto r0_schema = @@ -1571,21 +1592,23 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, struct BackpressureSourceConfig { std::string name_prefix; - bool is_fast; + bool is_gated; std::shared_ptr schema; decltype(l_batches) batches; - std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } + std::string name() const { + return name_prefix + ";" + (is_gated ? "gated" : "ungated"); + } }; Gate gate; GatedNodeOptions gate_options(&gate); - // must have at least one fast and one slow + // Two ungated and one gated std::vector source_configs = { - {"0", true, l_schema, l_batches}, - {"1", false, r0_schema, r0_batches}, - {"2", true, r1_schema, r1_batches}, + {"0", false, l_schema, l_batches}, + {"1", true, r0_schema, r0_batches}, + {"2", false, r1_schema, r1_batches}, }; std::vector bp_counters(source_configs.size()); @@ -1603,12 +1626,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::vector bp_in = {src_decls.back()}; Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, std::move(options)}; + if (config.is_gated) { + bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options}; + } bp_decls.push_back(bp_decl); } - Declaration asofjoin = { - "asofjoin", bp_decls, - GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; + Declaration asofjoin = {"asofjoin", bp_decls, + GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)}; ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, internal::ThreadPool::Make(1)); @@ -1616,7 +1641,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, Future batches_fut = DeclarationToExecBatchesAsync(asofjoin, exec_ctx); - BusyWait(10.0, [&] { + auto has_bp_been_applied = [&] { int total_paused = 0; for (const auto& counters : bp_counters) { total_paused += counters.pause_count; @@ -1624,7 +1649,10 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, // One of the inputs is gated. The other two will eventually be paused by the asof // join node return total_paused >= 2; - }); + }; + + BusyWait(10.0, has_bp_been_applied); + ASSERT_TRUE(has_bp_been_applied()); gate.ReleaseAllBatches(); ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); @@ -1637,8 +1665,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1, - /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false); + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); } template @@ -1703,8 +1730,7 @@ T GetEnvValue(const std::string& var, T default_value) { TEST(AsofJoinTest, BackpressureWithBatchesGen) { int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); - return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size, - /*fast_delay=*/0.001, /*slow_delay=*/0.01); + return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size); } } // namespace acero From ae5c84009c08a82c470c2bc339f6ed2ec5c4dc5f Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 15 Jun 2023 10:28:59 -0400 Subject: [PATCH 14/19] revert concurrency change --- cpp/src/arrow/acero/asof_join_node.cc | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index d9c273a72ba..cf2a8d616b8 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -370,10 +370,15 @@ struct MemoStore { times_.swap(memo.times_); } - // Updates the current time to `ts` if it is less. Returns true if updated. + // Updates the current time to `ts` if it is less. A different thread may win the race + // to update the current time to more than `ts` but not to less. Returns whether the + // current time was changed from its value at the beginning of this invocation. bool UpdateTime(OnType ts) { - bool update = current_time_ < ts; - if (update) current_time_ = ts; + OnType prev_time = current_time_; + bool update = prev_time < ts; + while (prev_time < ts && !current_time_.compare_exchange_weak(prev_time, ts)) { + // intentionally empty - standard CAS loop + } return update; } @@ -817,11 +822,8 @@ class InputState { ++batches_processed_; latest_ref_row_ = 0; have_active_batch &= !queue_.TryPop(); - if (have_active_batch) { + if (have_active_batch) DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed - key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache - memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed - } } } return have_active_batch; @@ -897,6 +899,8 @@ class InputState { Status Push(const std::shared_ptr& rb) { if (rb->num_rows() > 0) { + key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache + memo_.UpdateTime(GetTime(rb.get(), 0)); // time changed - update in MemoStore queue_.Push(rb); // only after above updates - push batch for processing } else { ++batches_processed_; // don't enqueue empty batches, just record as processed From 29e329ecd2eb7568a315afae7defdb08345df917 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 15 Jun 2023 10:33:53 -0400 Subject: [PATCH 15/19] doc for key hasher invalidation --- cpp/src/arrow/acero/asof_join_node.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index cf2a8d616b8..d3c988e18e3 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -495,9 +495,9 @@ class KeyHasher { 4 * kMiniBatchLength * sizeof(uint32_t)); } - void Invalidate() { - batch_ = NULLPTR; // invalidate cached hashes for batch - required when it changes - } + // invalidate cached hashes for batch - required when it changes + // only this method can be called concurrently with HashesFor + void Invalidate() { batch_ = NULLPTR; } // compute and cache a hash for each row of the given batch const std::vector& HashesFor(const RecordBatch* batch) { From 1e3652f6ebb73ade7f8889474342171f46d9f0c3 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 15 Jun 2023 14:06:05 -0400 Subject: [PATCH 16/19] refactor gate classes --- cpp/src/arrow/acero/asof_join_node_test.cc | 153 +------------------- cpp/src/arrow/acero/test_nodes.cc | 157 +++++++++++++++++++++ cpp/src/arrow/acero/test_nodes.h | 26 ++++ 3 files changed, 188 insertions(+), 148 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index e264bf81511..118d24ac168 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -48,7 +48,6 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/thread_pool.h" -#include "arrow/util/tracing_internal.h" #define TRACED_TEST(t_class, t_name, t_body) \ TEST(t_class, t_name) { \ @@ -1416,149 +1415,6 @@ struct BackpressureCountingNode : public MapNode { BackpressureCounters* counters; }; -class Gate { - public: - void ReleaseAllBatches() { - std::lock_guard lg(mutex_); - num_allowed_batches_ = -1; - NotifyAll(); - } - - void ReleaseOneBatch() { - std::lock_guard lg(mutex_); - DCHECK_GE(num_allowed_batches_, 0) - << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; - num_allowed_batches_++; - NotifyAll(); - } - - Future<> WaitForNextReleasedBatch() { - std::lock_guard lg(mutex_); - if (current_waiter_.is_valid()) { - return current_waiter_; - } - Future<> fut; - if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { - num_released_batches_++; - return Future<>::MakeFinished(); - } - - current_waiter_ = Future<>::Make(); - return current_waiter_; - } - - private: - void NotifyAll() { - if (current_waiter_.is_valid()) { - Future<> to_unlock = current_waiter_; - current_waiter_ = {}; - to_unlock.MarkFinished(); - } - } - - Future<> current_waiter_; - int num_released_batches_ = 0; - int num_allowed_batches_ = 0; - std::mutex mutex_; -}; - -struct GatedNodeOptions : public ExecNodeOptions { - explicit GatedNodeOptions(Gate* gate) : gate(gate) {} - Gate* gate; -}; - -struct GatedNode : public ExecNode, public TracedNode { - static constexpr auto kKindName = "BackpressureDelayingNode"; - static constexpr const char* kFactoryName = "backpressure_delay"; - - static void Register() { - auto exec_reg = default_exec_factory_registry(); - if (!exec_reg->GetFactory(kFactoryName).ok()) { - ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); - } - } - - GatedNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, const GatedNodeOptions& options) - : ExecNode(plan, inputs, {"input"}, output_schema), - TracedNode(this), - gate_(options.gate) {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); - auto gated_node_opts = static_cast(options); - return plan->EmplaceNode(plan, inputs, inputs[0]->output_schema(), - gated_node_opts); - } - - const char* kind_name() const override { return kKindName; } - - const Ordering& ordering() const override { return inputs_[0]->ordering(); } - Status InputFinished(ExecNode* input, int total_batches) override { - return output_->InputFinished(this, total_batches); - } - Status StartProducing() override { - NoteStartProducing(ToStringExtra()); - return Status::OK(); - } - - void PauseProducing(ExecNode* output, int32_t counter) override { - inputs_[0]->PauseProducing(this, counter); - } - - void ResumeProducing(ExecNode* output, int32_t counter) override { - inputs_[0]->ResumeProducing(this, counter); - } - - Status StopProducingImpl() override { return Status::OK(); } - - Status SendBatchesUnlocked(std::unique_lock&& lock) { - while (!queued_batches_.empty()) { - // If we are ready to release the batch, do so immediately. - Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); - bool callback_added = maybe_unlocked.TryAddCallback([this] { - return [this](const Status& st) { - DCHECK_OK(st); - plan_->query_context()->ScheduleTask( - [this] { - std::unique_lock lk(mutex_); - return SendBatchesUnlocked(std::move(lk)); - }, - "GatedNode::ResumeAfterNotify"); - }; - }); - if (callback_added) { - break; - } - // Otherwise, the future is already finished which means the gate is unlocked - // and we are allowed to send a batch - ExecBatch next = std::move(queued_batches_.front()); - queued_batches_.pop(); - lock.unlock(); - ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); - lock.lock(); - } - return Status::OK(); - } - - Status InputReceived(ExecNode* input, ExecBatch batch) override { - auto scope = TraceInputReceived(batch); - DCHECK_EQ(input, inputs_[0]); - - // This may be called concurrently by the source and by a restart attempt. Process - // one at a time (this critical section should be pretty small) - std::unique_lock lk(mutex_); - queued_batches_.push(std::move(batch)); - - return SendBatchesUnlocked(std::move(lk)); - } - - Gate* gate_; - std::queue queued_batches_; - std::mutex mutex_; -}; - AsyncGenerator> GetGen( AsyncGenerator> gen) { return gen; @@ -1588,7 +1444,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); BackpressureCountingNode::Register(); - GatedNode::Register(); + RegisterTestNodes(); // for GatedNode struct BackpressureSourceConfig { std::string name_prefix; @@ -1601,8 +1457,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { } }; - Gate gate; - GatedNodeOptions gate_options(&gate); + auto gate_ptr = Gate::Make(); + auto& gate = *gate_ptr; + GatedNodeOptions gate_options(gate_ptr.get()); // Two ungated and one gated std::vector source_configs = { @@ -1627,7 +1484,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, std::move(options)}; if (config.is_gated) { - bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options}; + bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options}; } bp_decls.push_back(bp_decl); } diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc index ff95f72e6e6..e109afbe1bf 100644 --- a/cpp/src/arrow/acero/test_nodes.cc +++ b/cpp/src/arrow/acero/test_nodes.cc @@ -31,6 +31,7 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -200,12 +201,168 @@ class JitterNode : public ExecNode { } // namespace +class GateImpl { + public: + void ReleaseAllBatches() { + std::lock_guard lg(mutex_); + num_allowed_batches_ = -1; + NotifyAll(); + } + + void ReleaseOneBatch() { + std::lock_guard lg(mutex_); + DCHECK_GE(num_allowed_batches_, 0) + << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; + num_allowed_batches_++; + NotifyAll(); + } + + Future<> WaitForNextReleasedBatch() { + std::lock_guard lg(mutex_); + if (current_waiter_.is_valid()) { + return current_waiter_; + } + Future<> fut; + if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { + num_released_batches_++; + return Future<>::MakeFinished(); + } + + current_waiter_ = Future<>::Make(); + return current_waiter_; + } + + private: + void NotifyAll() { + if (current_waiter_.is_valid()) { + Future<> to_unlock = current_waiter_; + current_waiter_ = {}; + to_unlock.MarkFinished(); + } + } + + Future<> current_waiter_; + int num_released_batches_ = 0; + int num_allowed_batches_ = 0; + std::mutex mutex_; +}; + +std::shared_ptr Gate::Make() { return std::make_shared(); } + +Gate::Gate() : impl_(new GateImpl()) {} + +Gate::~Gate() { delete impl_; } + +void Gate::ReleaseAllBatches() { impl_->ReleaseAllBatches(); } + +void Gate::ReleaseOneBatch() { impl_->ReleaseOneBatch(); } + +Future<> Gate::WaitForNextReleasedBatch() { return impl_->WaitForNextReleasedBatch(); } + +namespace { + +struct GatedNode : public ExecNode, public TracedNode { + static constexpr auto kKindName = "BackpressureDelayingNode"; + static constexpr const char* kFactoryName = "backpressure_delay"; + + static void Register() { + auto exec_reg = default_exec_factory_registry(); + if (!exec_reg->GetFactory(kFactoryName).ok()) { + ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); + } + } + + GatedNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, const GatedNodeOptions& options) + : ExecNode(plan, inputs, {"input"}, output_schema), + TracedNode(this), + gate_(options.gate) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); + auto gated_node_opts = static_cast(options); + return plan->EmplaceNode(plan, inputs, inputs[0]->output_schema(), + gated_node_opts); + } + + const char* kind_name() const override { return kKindName; } + + const Ordering& ordering() const override { return inputs_[0]->ordering(); } + Status InputFinished(ExecNode* input, int total_batches) override { + return output_->InputFinished(this, total_batches); + } + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status SendBatchesUnlocked(std::unique_lock&& lock) { + while (!queued_batches_.empty()) { + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + bool callback_added = maybe_unlocked.TryAddCallback([this] { + return [this](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this] { + std::unique_lock lk(mutex_); + return SendBatchesUnlocked(std::move(lk)); + }, + "GatedNode::ResumeAfterNotify"); + }; + }); + if (callback_added) { + break; + } + // Otherwise, the future is already finished which means the gate is unlocked + // and we are allowed to send a batch + ExecBatch next = std::move(queued_batches_.front()); + queued_batches_.pop(); + lock.unlock(); + ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); + lock.lock(); + } + return Status::OK(); + } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + // This may be called concurrently by the source and by a restart attempt. Process + // one at a time (this critical section should be pretty small) + std::unique_lock lk(mutex_); + queued_batches_.push(std::move(batch)); + + return SendBatchesUnlocked(std::move(lk)); + } + + Gate* gate_; + std::queue queued_batches_; + std::mutex mutex_; +}; + +} // namespace + void RegisterTestNodes() { static std::once_flag registered; std::call_once(registered, [] { ExecFactoryRegistry* registry = default_exec_factory_registry(); DCHECK_OK( registry->AddFactory(std::string(JitterNodeOptions::kName), JitterNode::Make)); + DCHECK_OK( + registry->AddFactory(std::string(GatedNodeOptions::kName), GatedNode::Make)); }); } diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h index 2d1d630b3b7..b2cf66353b0 100644 --- a/cpp/src/arrow/acero/test_nodes.h +++ b/cpp/src/arrow/acero/test_nodes.h @@ -53,6 +53,32 @@ struct JitterNodeOptions : public ExecNodeOptions { static constexpr std::string_view kName = "jitter"; }; +class GateImpl; + +class Gate { + public: + static std::shared_ptr Make(); + + Gate(); + virtual ~Gate(); + + void ReleaseAllBatches(); + void ReleaseOneBatch(); + Future<> WaitForNextReleasedBatch(); + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Gate); + + GateImpl* impl_; +}; + +struct GatedNodeOptions : public ExecNodeOptions { + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + Gate* gate; + + static constexpr std::string_view kName = "gated"; +}; + void RegisterTestNodes(); } // namespace acero From c458c2ee7442725d0a82a644a53dae52d51338e5 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 15 Jun 2023 14:31:50 -0400 Subject: [PATCH 17/19] tighten pause counts check --- cpp/src/arrow/acero/asof_join_node_test.cc | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 118d24ac168..dad93f7063d 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1499,13 +1499,17 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { DeclarationToExecBatchesAsync(asofjoin, exec_ctx); auto has_bp_been_applied = [&] { - int total_paused = 0; - for (const auto& counters : bp_counters) { - total_paused += counters.pause_count; - } // One of the inputs is gated. The other two will eventually be paused by the asof // join node - return total_paused >= 2; + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& counters = bp_counters[i]; + if (source_configs[i].is_gated) { + if (counters.pause_count > 0) return false; + } else { + if (counters.pause_count != 1) return false; + } + } + return true; }; BusyWait(10.0, has_bp_been_applied); From 2a1bac175e979a51fea584bc67f3d8d36c600609 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 15 Jun 2023 14:34:00 -0400 Subject: [PATCH 18/19] doc GatedNodeOptions --- cpp/src/arrow/acero/test_nodes.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h index b2cf66353b0..7e31aa31b34 100644 --- a/cpp/src/arrow/acero/test_nodes.h +++ b/cpp/src/arrow/acero/test_nodes.h @@ -72,6 +72,7 @@ class Gate { GateImpl* impl_; }; +// A node that holds all input batches until a given gate is released struct GatedNodeOptions : public ExecNodeOptions { explicit GatedNodeOptions(Gate* gate) : gate(gate) {} Gate* gate; From 05ea6c8d75a1bcbc096b0d5825defb3aa40bfe34 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 16 Jun 2023 08:44:31 -0400 Subject: [PATCH 19/19] stronger test condition for resume --- cpp/src/arrow/acero/asof_join_node_test.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index dad93f7063d..c62d0c0b85f 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1518,11 +1518,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { gate.ReleaseAllBatches(); ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - size_t total_resumed = 0; - for (const auto& counters : bp_counters) { - total_resumed += counters.resume_count; + // One of the inputs is gated. The other two will eventually be resumed by the asof + // join node + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& counters = bp_counters[i]; + if (!source_configs[i].is_gated) { + ASSERT_GE(counters.resume_count, 0); + } } - ASSERT_GE(total_resumed, 2); } TEST(AsofJoinTest, BackpressureWithBatches) {