From e39628da07fe4e1500197e5a8a62a4fbf9dd58c7 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 14:20:21 +0530 Subject: [PATCH 01/18] adding table source node --- cpp/src/arrow/compute/exec/options.h | 11 ++++ cpp/src/arrow/compute/exec/plan_test.cc | 26 ++++++++++ cpp/src/arrow/compute/exec/source_node.cc | 63 +++++++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 9544b9ed4db..1e80322b2b8 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -55,6 +55,17 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; +/// \brief Adapt an Table as a source node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { + public: + TableSourceNodeOptions(std::shared_ptr table) : table(table) {} + + std::shared_ptr
table; +}; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index b4b24e832ef..77eff9e511a 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -238,6 +238,32 @@ TEST(ExecPlanExecution, SourceSink) { } } +TEST(ExecPlanExecution, TableSourceSink) { + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exec_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN( + auto table, TableFromExecBatches(exec_batches.schema, exec_batches.batches)); + + ASSERT_OK(Declaration::Sequence({ + {"table", TableSourceNodeOptions{table}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(exec_batches.batches)))); + } + } +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { constexpr uint32_t kPauseIfAbove = 4; constexpr uint32_t kResumeIfBelow = 2; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 6d47609d2a2..c7b6cbbc793 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -25,6 +25,7 @@ #include "arrow/compute/exec_internal.h" #include "arrow/datum.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -34,10 +35,12 @@ #include "arrow/util/thread_pool.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" +#include "arrow/util/vector.h" namespace arrow { using internal::checked_cast; +using internal::MapVector; namespace compute { namespace { @@ -174,12 +177,72 @@ struct SourceNode : ExecNode { AsyncGenerator> generator_; }; +struct TableSourceNode : public SourceNode { + TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, + std::shared_ptr
table) + : SourceNode(plan, output_schema, + generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); + const auto& table_options = checked_cast(options); + return plan->EmplaceNode(plan, table_options.table->schema(), + table_options.table); + } + const char* kind_name() const override { return "TableSourceNode"; } + + [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override { + SourceNode::InputReceived(input, batch); + } + [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override { + SourceNode::ErrorReceived(input, status); + } + [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override { + SourceNode::InputFinished(input, total_batches); + } + + Status StartProducing() override { return SourceNode::StartProducing(); } + + void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); } + + void StopProducing() override { SourceNode::StopProducing(); } + + Future<> finished() override { return SourceNode::finished(); } + + arrow::AsyncGenerator> generator( + std::vector batches) { + auto opt_batches = MapVector( + [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); + AsyncGenerator> gen; + gen = MakeVectorGenerator(std::move(opt_batches)); + return gen; + } + + arrow::Result> ConvertTableToExecBatches(const Table& table) { + std::shared_ptr reader = std::make_shared(table); + std::shared_ptr batch; + std::vector> batch_vector; + std::vector exec_batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + if (batch == NULLPTR) { + break; + } + ExecBatch exec_batch{*batch}; + exec_batches.push_back(exec_batch); + } + return exec_batches; + } +}; + } // namespace namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); + DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); } } // namespace internal From f2a64c70298ce7b1af70f7fa61a6010f138274d8 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 14:53:09 +0530 Subject: [PATCH 02/18] adding max_chunk_size --- cpp/src/arrow/compute/exec/options.h | 4 +++- cpp/src/arrow/compute/exec/source_node.cc | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 1e80322b2b8..8cc5fb8bd62 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -61,9 +61,11 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { /// outputs, if provided. class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: - TableSourceNodeOptions(std::shared_ptr
table) : table(table) {} + TableSourceNodeOptions(std::shared_ptr
table, int64_t max_chunksize) + : table(table), max_chunksize(max_chunksize) {} std::shared_ptr
table; + int64_t max_chunksize; }; /// \brief Make a node which excludes some rows from batches passed through it diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index c7b6cbbc793..8a4eee23ba1 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -179,16 +179,18 @@ struct SourceNode : ExecNode { struct TableSourceNode : public SourceNode { TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, - std::shared_ptr
table) + std::shared_ptr
table, int64_t max_chunksize) : SourceNode(plan, output_schema, - generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())) {} + generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())), + max_chunksize(max_chunksize) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); const auto& table_options = checked_cast(options); return plan->EmplaceNode(plan, table_options.table->schema(), - table_options.table); + table_options.table, + table_options.max_chunksize); } const char* kind_name() const override { return "TableSourceNode"; } @@ -221,6 +223,12 @@ struct TableSourceNode : public SourceNode { arrow::Result> ConvertTableToExecBatches(const Table& table) { std::shared_ptr reader = std::make_shared(table); + + // setting chunksize for the batch reader + if (max_chunksize > 0) { + reader->set_chunksize(max_chunksize); + } + std::shared_ptr batch; std::vector> batch_vector; std::vector exec_batches; @@ -234,6 +242,9 @@ struct TableSourceNode : public SourceNode { } return exec_batches; } + + private: + int64_t max_chunksize; }; } // namespace From 20c9a6fdf8af2d1a86020d52b1c259af60af971d Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 14:53:43 +0530 Subject: [PATCH 03/18] adding test case --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 77eff9e511a..4e663b059aa 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -253,7 +253,7 @@ TEST(ExecPlanExecution, TableSourceSink) { auto table, TableFromExecBatches(exec_batches.schema, exec_batches.batches)); ASSERT_OK(Declaration::Sequence({ - {"table", TableSourceNodeOptions{table}}, + {"table", TableSourceNodeOptions{table, 1}}, {"sink", SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); From bc420a51fa305c0d42c536264a736e9bef27e248 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 27 Jan 2022 18:42:08 +0530 Subject: [PATCH 04/18] refactor max_chunksize to batch_size --- cpp/src/arrow/compute/exec/options.h | 6 +++--- cpp/src/arrow/compute/exec/plan_test.cc | 11 ++++++----- cpp/src/arrow/compute/exec/source_node.cc | 14 +++++++------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 8cc5fb8bd62..9359a17db27 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -61,11 +61,11 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { /// outputs, if provided. class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: - TableSourceNodeOptions(std::shared_ptr
table, int64_t max_chunksize) - : table(table), max_chunksize(max_chunksize) {} + TableSourceNodeOptions(std::shared_ptr
table, int64_t batch_size) + : table(table), batch_size(batch_size) {} std::shared_ptr
table; - int64_t max_chunksize; + int64_t batch_size; }; /// \brief Make a node which excludes some rows from batches passed through it diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 4e663b059aa..d111ceeb897 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -248,18 +248,19 @@ TEST(ExecPlanExecution, TableSourceSink) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; - auto exec_batches = MakeBasicBatches(); + auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN( - auto table, TableFromExecBatches(exec_batches.schema, exec_batches.batches)); + auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); ASSERT_OK(Declaration::Sequence({ - {"table", TableSourceNodeOptions{table, 1}}, + {"table_source", TableSourceNodeOptions{table, 1}}, {"sink", SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(exec_batches.batches)))); + ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); + AssertTablesEqual(table, out_table); } } } diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 8a4eee23ba1..edff12f53a4 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -179,10 +179,10 @@ struct SourceNode : ExecNode { struct TableSourceNode : public SourceNode { TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, - std::shared_ptr
table, int64_t max_chunksize) + std::shared_ptr
table, int64_t batch_size) : SourceNode(plan, output_schema, generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())), - max_chunksize(max_chunksize) {} + batch_size(batch_size) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -190,7 +190,7 @@ struct TableSourceNode : public SourceNode { const auto& table_options = checked_cast(options); return plan->EmplaceNode(plan, table_options.table->schema(), table_options.table, - table_options.max_chunksize); + table_options.batch_size); } const char* kind_name() const override { return "TableSourceNode"; } @@ -225,8 +225,8 @@ struct TableSourceNode : public SourceNode { std::shared_ptr reader = std::make_shared(table); // setting chunksize for the batch reader - if (max_chunksize > 0) { - reader->set_chunksize(max_chunksize); + if (batch_size > 0) { + reader->set_chunksize(batch_size); } std::shared_ptr batch; @@ -244,7 +244,7 @@ struct TableSourceNode : public SourceNode { } private: - int64_t max_chunksize; + int64_t batch_size; }; } // namespace @@ -253,7 +253,7 @@ namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); - DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); + DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); } } // namespace internal From 5f943c7cedbce460ec012abcf0c152543157d409 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 27 Jan 2022 18:52:09 +0530 Subject: [PATCH 05/18] fixed format --- cpp/src/arrow/compute/exec/plan_test.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index d111ceeb897..8d0e97fee82 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -249,14 +249,15 @@ TEST(ExecPlanExecution, TableSourceSink) { AsyncGenerator> sink_gen; auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN( - auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - ASSERT_OK(Declaration::Sequence({ - {"table_source", TableSourceNodeOptions{table, 1}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence({ + {"table_source", TableSourceNodeOptions{table, 1}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); From 34e93452d426d063017dfdb5caf1e911f35eeceb Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 10:43:11 +0530 Subject: [PATCH 06/18] addressed review comments --- cpp/src/arrow/compute/exec/options.h | 10 ++-- cpp/src/arrow/compute/exec/plan_test.cc | 48 +++++++++++++------ cpp/src/arrow/compute/exec/source_node.cc | 57 +++++++++++------------ 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 9359a17db27..b3777b1600d 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -56,15 +56,19 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { }; /// \brief Adapt an Table as a source node -/// -/// plan->exec_context()->executor() will be used to parallelize pushing to -/// outputs, if provided. +/// The table will be sent through the exec plan in batches. +/// Each batch will be submitted as a new thread task +/// if plan->exec_context()->executor() is not null. class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: TableSourceNodeOptions(std::shared_ptr
table, int64_t batch_size) : table(table), batch_size(batch_size) {} + // arrow table which acts as the data source std::shared_ptr
table; + // batch size which used to set the chunk_size to + // the table batch reader used in building the data source + // from the table int64_t batch_size; }; diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 8d0e97fee82..a42effe22b6 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -245,27 +245,47 @@ TEST(ExecPlanExecution, TableSourceSink) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; + for (int batch_size : {1, 4}) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(auto table, - TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN( + auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - ASSERT_OK( - Declaration::Sequence({ - {"table_source", TableSourceNodeOptions{table, 1}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK(Declaration::Sequence( + { + {"table_source", TableSourceNodeOptions{table, batch_size}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); - ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); - ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); - AssertTablesEqual(table, out_table); + ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + ASSERT_OK_AND_ASSIGN(auto out_table, + TableFromExecBatches(exp_batches.schema, res)); + AssertTablesEqual(table, out_table); + } } } } +TEST(ExecPlanExecution, TableSourceSinkError) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + + auto null_table_options = TableSourceNodeOptions{nullptr, 1}; + ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, null_table_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); + + auto negative_batch_size_options = TableSourceNodeOptions{table, -1}; + ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, negative_batch_size_options), + Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { constexpr uint32_t kPauseIfAbove = 4; constexpr uint32_t kResumeIfBelow = 2; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index edff12f53a4..402fb205a9c 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -26,6 +26,7 @@ #include "arrow/datum.h" #include "arrow/result.h" #include "arrow/table.h" +#include "arrow/testing/gtest_util.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -178,42 +179,41 @@ struct SourceNode : ExecNode { }; struct TableSourceNode : public SourceNode { - TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, - std::shared_ptr
table, int64_t batch_size) - : SourceNode(plan, output_schema, - generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())), - batch_size(batch_size) {} + TableSourceNode(ExecPlan* plan, std::shared_ptr
table, int64_t batch_size) + : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); const auto& table_options = checked_cast(options); - return plan->EmplaceNode(plan, table_options.table->schema(), - table_options.table, - table_options.batch_size); - } - const char* kind_name() const override { return "TableSourceNode"; } + auto table = table_options.table; + auto batch_size = table_options.batch_size; - [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override { - SourceNode::InputReceived(input, batch); - } - [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override { - SourceNode::ErrorReceived(input, status); - } - [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override { - SourceNode::InputFinished(input, total_batches); + RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode")); + + return plan->EmplaceNode(plan, table, batch_size); } - Status StartProducing() override { return SourceNode::StartProducing(); } + const char* kind_name() const override { return "TableSourceNode"; } - void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); } + static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr
table, + const int batch_size, + const char* kind_name) { + if (table == nullptr) { + return Status::Invalid(kind_name, " node requires table which is not null"); + } - void StopProducing() override { SourceNode::StopProducing(); } + if (batch_size <= 0) { + return Status::Invalid( + kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size); + } - Future<> finished() override { return SourceNode::finished(); } + return Status::OK(); + } - arrow::AsyncGenerator> generator( - std::vector batches) { + static arrow::AsyncGenerator> TableGenerator( + const Table& table, const int batch_size) { + auto batches = ConvertTableToExecBatches(table, batch_size); auto opt_batches = MapVector( [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); AsyncGenerator> gen; @@ -221,7 +221,8 @@ struct TableSourceNode : public SourceNode { return gen; } - arrow::Result> ConvertTableToExecBatches(const Table& table) { + static std::vector ConvertTableToExecBatches(const Table& table, + const int batch_size) { std::shared_ptr reader = std::make_shared(table); // setting chunksize for the batch reader @@ -230,10 +231,9 @@ struct TableSourceNode : public SourceNode { } std::shared_ptr batch; - std::vector> batch_vector; std::vector exec_batches; while (true) { - ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + ASSIGN_OR_ABORT(batch, reader->Next()); if (batch == NULLPTR) { break; } @@ -242,9 +242,6 @@ struct TableSourceNode : public SourceNode { } return exec_batches; } - - private: - int64_t batch_size; }; } // namespace From 8f1342f76dbba65006a0815e07ad9f0bc69944b1 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 14:47:40 +0530 Subject: [PATCH 07/18] fixing an import issue --- cpp/src/arrow/compute/exec/source_node.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 402fb205a9c..6108590c501 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -26,7 +26,6 @@ #include "arrow/datum.h" #include "arrow/result.h" #include "arrow/table.h" -#include "arrow/testing/gtest_util.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -233,7 +232,10 @@ struct TableSourceNode : public SourceNode { std::shared_ptr batch; std::vector exec_batches; while (true) { - ASSIGN_OR_ABORT(batch, reader->Next()); + auto batch_res = reader->Next(); + if (batch_res.ok()) { + batch = batch_res.ValueOrDie(); + } if (batch == NULLPTR) { break; } From 8cd680df6c6398e1db1fd7ff6bca2e91a20840f2 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 18:20:48 +0530 Subject: [PATCH 08/18] updated func args --- cpp/src/arrow/compute/exec/source_node.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 6108590c501..8a6d9a52ae1 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -196,7 +196,7 @@ struct TableSourceNode : public SourceNode { const char* kind_name() const override { return "TableSourceNode"; } static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr
table, - const int batch_size, + const int64_t batch_size, const char* kind_name) { if (table == nullptr) { return Status::Invalid(kind_name, " node requires table which is not null"); @@ -221,7 +221,7 @@ struct TableSourceNode : public SourceNode { } static std::vector ConvertTableToExecBatches(const Table& table, - const int batch_size) { + const int64_t batch_size) { std::shared_ptr reader = std::make_shared(table); // setting chunksize for the batch reader From 6c0f139ceb63375ef9cf2cdf8d274077b21e429e Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 1 Feb 2022 18:31:44 +0530 Subject: [PATCH 09/18] minor fix --- cpp/src/arrow/compute/exec/source_node.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 8a6d9a52ae1..02a50ec2db8 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -229,7 +229,7 @@ struct TableSourceNode : public SourceNode { reader->set_chunksize(batch_size); } - std::shared_ptr batch; + std::shared_ptr batch; std::vector exec_batches; while (true) { auto batch_res = reader->Next(); From 921b35c2618520e189e3699511f89e39ca9c2034 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 2 Feb 2022 06:45:54 +0530 Subject: [PATCH 10/18] update test case and minor fix --- cpp/src/arrow/compute/exec/plan_test.cc | 41 +++++++++-------------- cpp/src/arrow/compute/exec/source_node.cc | 4 +-- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index a42effe22b6..e176c701b65 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -239,33 +239,24 @@ TEST(ExecPlanExecution, SourceSink) { } TEST(ExecPlanExecution, TableSourceSink) { - for (bool slow : {false, true}) { - SCOPED_TRACE(slow ? "slowed" : "unslowed"); - - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel" : "single threaded"); - - for (int batch_size : {1, 4}) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - AsyncGenerator> sink_gen; + for (int batch_size : {1, 4}) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN( - auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - ASSERT_OK(Declaration::Sequence( - { - {"table_source", TableSourceNodeOptions{table, batch_size}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK(Declaration::Sequence( + { + {"table_source", TableSourceNodeOptions{table, batch_size}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); - ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); - ASSERT_OK_AND_ASSIGN(auto out_table, - TableFromExecBatches(exp_batches.schema, res)); - AssertTablesEqual(table, out_table); - } - } + ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res)); + AssertTablesEqual(table, out_table); } } @@ -277,7 +268,7 @@ TEST(ExecPlanExecution, TableSourceSinkError) { ASSERT_OK_AND_ASSIGN(auto table, TableFromExecBatches(exp_batches.schema, exp_batches.batches)); - auto null_table_options = TableSourceNodeOptions{nullptr, 1}; + auto null_table_options = TableSourceNodeOptions{NULLPTR, 1}; ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, null_table_options), Raises(StatusCode::Invalid, HasSubstr("not null"))); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 02a50ec2db8..67f8dfef2c8 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -185,8 +185,8 @@ struct TableSourceNode : public SourceNode { const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); const auto& table_options = checked_cast(options); - auto table = table_options.table; - auto batch_size = table_options.batch_size; + const auto table = table_options.table; + const int64_t batch_size = table_options.batch_size; RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode")); From 89cabcd7238871d334f4f8060a346f6e9e5a6b28 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 2 Feb 2022 07:00:13 +0530 Subject: [PATCH 11/18] minor change on func args --- cpp/src/arrow/compute/exec/source_node.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 67f8dfef2c8..c66c6c730a2 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -211,7 +211,7 @@ struct TableSourceNode : public SourceNode { } static arrow::AsyncGenerator> TableGenerator( - const Table& table, const int batch_size) { + const Table& table, const int64_t batch_size) { auto batches = ConvertTableToExecBatches(table, batch_size); auto opt_batches = MapVector( [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); From 889b5868953cfb41ca5d8e0a40cf2428cbca7353 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 14:20:21 +0530 Subject: [PATCH 12/18] rebase --- cpp/src/arrow/compute/exec/source_node.cc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index c66c6c730a2..06dc36e1611 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -220,6 +220,7 @@ struct TableSourceNode : public SourceNode { return gen; } +<<<<<<< HEAD static std::vector ConvertTableToExecBatches(const Table& table, const int64_t batch_size) { std::shared_ptr reader = std::make_shared(table); @@ -236,6 +237,15 @@ struct TableSourceNode : public SourceNode { if (batch_res.ok()) { batch = batch_res.ValueOrDie(); } +======= + arrow::Result> ConvertTableToExecBatches(const Table& table) { + std::shared_ptr reader = std::make_shared(table); + std::shared_ptr batch; + std::vector> batch_vector; + std::vector exec_batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); +>>>>>>> f505d915f (adding table source node) if (batch == NULLPTR) { break; } @@ -252,7 +262,11 @@ namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); +<<<<<<< HEAD DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); +======= + DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); +>>>>>>> f505d915f (adding table source node) } } // namespace internal From ce147c04e0e8fe2d5869e12eeb45636d613976be Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 14:20:21 +0530 Subject: [PATCH 13/18] resolve merge conflicts --- cpp/src/arrow/compute/exec/options.h | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index b3777b1600d..19241ac9711 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -56,8 +56,6 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { }; /// \brief Adapt an Table as a source node -/// The table will be sent through the exec plan in batches. -/// Each batch will be submitted as a new thread task /// if plan->exec_context()->executor() is not null. class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: @@ -72,26 +70,6 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t batch_size; }; -/// \brief Make a node which excludes some rows from batches passed through it -/// -/// filter_expression will be evaluated against each batch which is pushed to -/// this node. Any rows for which filter_expression does not evaluate to `true` will be -/// excluded in the batch emitted by this node. -class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { - public: - explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) - : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} - - Expression filter_expression; - bool async_mode; -}; - -/// \brief Make a node which executes expressions on input batches, producing new batches. -/// -/// Each expression will be evaluated against each batch which is pushed to -/// this node to produce a corresponding output column. -/// -/// If names are not provided, the string representations of exprs will be used. class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, From 4fd4ec58cf83b7aa34d998e016973ba8a939bae1 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Feb 2022 19:39:52 +0530 Subject: [PATCH 14/18] resolving erroneous commit --- cpp/src/arrow/compute/exec/options.h | 14 ++++++++++++++ cpp/src/arrow/compute/exec/source_node.cc | 14 -------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 19241ac9711..cfdc07f8aef 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -70,6 +70,20 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t batch_size; }; +/// \brief Make a node which excludes some rows from batches passed through it +/// +/// filter_expression will be evaluated against each batch which is pushed to +/// this node. Any rows for which filter_expression does not evaluate to `true` will be +/// excluded in the batch emitted by this node. +class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { + public: + explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) + : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} + + Expression filter_expression; + bool async_mode; +}; + class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 06dc36e1611..c66c6c730a2 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -220,7 +220,6 @@ struct TableSourceNode : public SourceNode { return gen; } -<<<<<<< HEAD static std::vector ConvertTableToExecBatches(const Table& table, const int64_t batch_size) { std::shared_ptr reader = std::make_shared(table); @@ -237,15 +236,6 @@ struct TableSourceNode : public SourceNode { if (batch_res.ok()) { batch = batch_res.ValueOrDie(); } -======= - arrow::Result> ConvertTableToExecBatches(const Table& table) { - std::shared_ptr reader = std::make_shared(table); - std::shared_ptr batch; - std::vector> batch_vector; - std::vector exec_batches; - while (true) { - ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); ->>>>>>> f505d915f (adding table source node) if (batch == NULLPTR) { break; } @@ -262,11 +252,7 @@ namespace internal { void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); -<<<<<<< HEAD DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); -======= - DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); ->>>>>>> f505d915f (adding table source node) } } // namespace internal From 150c902cde10544db03a49d8a860b31374c0f1bb Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Fri, 4 Feb 2022 19:43:09 +0530 Subject: [PATCH 15/18] fixing an errorneous commit v2 --- cpp/src/arrow/compute/exec/options.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index cfdc07f8aef..ed46cb3311e 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -84,6 +84,12 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { bool async_mode; }; +/// \brief Make a node which executes expressions on input batches, producing new batches. +/// +/// Each expression will be evaluated against each batch which is pushed to +/// this node to produce a corresponding output column. +/// +/// If names are not provided, the string representations of exprs will be used. class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, From 5618184b13aa38c5fcb37cb13b3debf1851a84f8 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 15 Feb 2022 13:45:51 +0530 Subject: [PATCH 16/18] resolved review comments --- cpp/src/arrow/compute/exec/options.h | 7 +++--- cpp/src/arrow/compute/exec/source_node.cc | 30 +++++++++++------------ 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index ed46cb3311e..fd9858b7ff4 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -55,7 +55,6 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; -/// \brief Adapt an Table as a source node /// if plan->exec_context()->executor() is not null. class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: @@ -64,9 +63,9 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { // arrow table which acts as the data source std::shared_ptr
table; - // batch size which used to set the chunk_size to - // the table batch reader used in building the data source - // from the table + // Size of batches to emit from this node + // If the table is larger the node will emit multiple batches from the + // the table to be processed in parallel. int64_t batch_size; }; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index c66c6c730a2..6b9a2ee9132 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -179,32 +179,32 @@ struct SourceNode : ExecNode { struct TableSourceNode : public SourceNode { TableSourceNode(ExecPlan* plan, std::shared_ptr
table, int64_t batch_size) - : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {} + : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); const auto& table_options = checked_cast(options); - const auto table = table_options.table; + const auto& table = table_options.table; const int64_t batch_size = table_options.batch_size; - RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode")); + RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size)); return plan->EmplaceNode(plan, table, batch_size); } const char* kind_name() const override { return "TableSourceNode"; } - static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr
table, - const int64_t batch_size, - const char* kind_name) { + static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr
table, + const int64_t batch_size) { if (table == nullptr) { - return Status::Invalid(kind_name, " node requires table which is not null"); + return Status::Invalid("TableSourceNode node requires table which is not null"); } if (batch_size <= 0) { return Status::Invalid( - kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size); + "TableSourceNode node requires, batch_size > 0 , but got batch size ", + batch_size); } return Status::OK(); @@ -213,8 +213,9 @@ struct TableSourceNode : public SourceNode { static arrow::AsyncGenerator> TableGenerator( const Table& table, const int64_t batch_size) { auto batches = ConvertTableToExecBatches(table, batch_size); - auto opt_batches = MapVector( - [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); + auto opt_batches = + MapVector([](ExecBatch batch) { return util::make_optional(std::move(batch)); }, + std::move(batches)); AsyncGenerator> gen; gen = MakeVectorGenerator(std::move(opt_batches)); return gen; @@ -225,22 +226,19 @@ struct TableSourceNode : public SourceNode { std::shared_ptr reader = std::make_shared(table); // setting chunksize for the batch reader - if (batch_size > 0) { - reader->set_chunksize(batch_size); - } + reader->set_chunksize(batch_size); std::shared_ptr batch; std::vector exec_batches; while (true) { auto batch_res = reader->Next(); if (batch_res.ok()) { - batch = batch_res.ValueOrDie(); + batch = std::move(batch_res).MoveValueUnsafe(); } if (batch == NULLPTR) { break; } - ExecBatch exec_batch{*batch}; - exec_batches.push_back(exec_batch); + exec_batches.emplace_back(*batch); } return exec_batches; } From dec8ea28a93169009fe4e6fedb978d50b60056fc Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 24 Feb 2022 10:50:51 +0530 Subject: [PATCH 17/18] minor change --- cpp/src/arrow/compute/exec/options.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index fd9858b7ff4..20500915ee1 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -55,7 +55,7 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; -/// if plan->exec_context()->executor() is not null. +/// \brief An extended Source node which accepts a table class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: TableSourceNodeOptions(std::shared_ptr
table, int64_t batch_size) From d1f2234c65bf39402954c6582f3608556ba0b87f Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Thu, 24 Feb 2022 12:49:16 +0530 Subject: [PATCH 18/18] format fix --- cpp/src/arrow/compute/exec/options.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 20500915ee1..d2ad45d37b9 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -55,7 +55,7 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; -/// \brief An extended Source node which accepts a table +/// \brief An extended Source node which accepts a table class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: TableSourceNodeOptions(std::shared_ptr
table, int64_t batch_size)