From 88b046ede6f41e53313e6b76b754a4dd7605a726 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 25 Mar 2022 13:28:33 -1000 Subject: [PATCH 1/7] ARROW-16033: Modifies the sink node consumer interface so that we pass the schema along with each batch --- .../arrow/engine_substrait_consumption.cc | 3 ++- .../execution_plan_documentation_examples.cc | 5 +++-- cpp/src/arrow/compute/exec/options.h | 5 ++--- cpp/src/arrow/compute/exec/plan_test.cc | 14 ++++++++++---- cpp/src/arrow/compute/exec/sink_node.cc | 17 ++++++++--------- cpp/src/arrow/dataset/file_base.cc | 17 ++++++----------- cpp/src/arrow/dataset/file_base.h | 4 +--- 7 files changed, 32 insertions(+), 33 deletions(-) diff --git a/cpp/examples/arrow/engine_substrait_consumption.cc b/cpp/examples/arrow/engine_substrait_consumption.cc index b0109b36888..fe82949ef11 100644 --- a/cpp/examples/arrow/engine_substrait_consumption.cc +++ b/cpp/examples/arrow/engine_substrait_consumption.cc @@ -40,7 +40,8 @@ class IgnoringConsumer : public cp::SinkNodeConsumer { public: explicit IgnoringConsumer(size_t tag) : tag_{tag} {} - arrow::Status Consume(cp::ExecBatch batch) override { + arrow::Status Consume(cp::ExecBatch batch, + const std::shared_ptr& schema) override { // Consume a batch of data // (just print its row count to stdout) std::cout << "-" << tag_ << " consumed " << batch.length << " rows" << std::endl; diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 81cdcef5301..2bd20255ae3 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -591,7 +591,8 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) { CustomSinkNodeConsumer(std::atomic* batches_seen, arrow::Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} - arrow::Status Consume(cp::ExecBatch batch) override { + arrow::Status Consume(cp::ExecBatch batch, + const std::shared_ptr& schema) override { (*batches_seen)++; return arrow::Status::OK(); } @@ -794,7 +795,7 @@ arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context, write_options.partitioning = partitioning; write_options.basename_template = "part{i}.parquet"; - arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()}; + arrow::dataset::WriteNodeOptions write_node_options{write_options}; ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options)); diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index d5780753254..f0b3344b374 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -146,7 +146,7 @@ class ARROW_EXPORT SinkNodeConsumer { public: virtual ~SinkNodeConsumer() = default; /// \brief Consume a batch of data - virtual Status Consume(ExecBatch batch) = 0; + virtual Status Consume(ExecBatch batch, const std::shared_ptr& schema) = 0; /// \brief Signal to the consumer that the last batch has been delivered /// /// The returned future should only finish when all outstanding tasks have completed @@ -310,10 +310,9 @@ class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: TableSinkNodeOptions(std::shared_ptr* output_table, std::shared_ptr output_schema) - : output_table(output_table), output_schema(std::move(output_schema)) {} + : output_table(output_table) {} std::shared_ptr
* output_table; - std::shared_ptr output_schema; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index e176c701b65..a446d987f97 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -493,7 +493,7 @@ TEST(ExecPlanExecution, SourceConsumingSink) { TestConsumer(std::atomic* batches_seen, Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} - Status Consume(ExecBatch batch) override { + Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { (*batches_seen)++; return Status::OK(); } @@ -561,11 +561,15 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { TEST(ExecPlanExecution, ConsumingSinkError) { struct ConsumeErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); } + Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + return Status::Invalid("XYZ"); + } Future<> Finish() override { return Future<>::MakeFinished(); } }; struct FinishErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch) override { return Status::OK(); } + Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + return Status::OK(); + } Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } }; std::vector> consumers{ @@ -593,7 +597,9 @@ TEST(ExecPlanExecution, ConsumingSinkError) { TEST(ExecPlanExecution, ConsumingSinkErrorFinish) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); struct FinishErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch) override { return Status::OK(); } + Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + return Status::OK(); + } Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } }; std::shared_ptr consumer = std::make_shared(); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 13564c736b5..c2291fe21b8 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -213,7 +213,8 @@ class ConsumingSinkNode : public ExecNode { return; } - Status consumption_status = consumer_->Consume(std::move(batch)); + Status consumption_status = + consumer_->Consume(std::move(batch), input->output_schema()); if (!consumption_status.ok()) { if (input_counter_.Cancel()) { Finish(std::move(consumption_status)); @@ -268,13 +269,12 @@ class ConsumingSinkNode : public ExecNode { struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { public: - TableSinkNodeConsumer(std::shared_ptr
* out, - std::shared_ptr output_schema, MemoryPool* pool) - : out_(out), output_schema_(std::move(output_schema)), pool_(pool) {} + TableSinkNodeConsumer(std::shared_ptr
* out, MemoryPool* pool) + : out_(out), pool_(pool) {} - Status Consume(ExecBatch batch) override { + Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { std::lock_guard guard(consume_mutex_); - ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_)); + ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema, pool_)); batches_.push_back(rb); return Status::OK(); } @@ -286,7 +286,6 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { private: std::shared_ptr
* out_; - std::shared_ptr output_schema_; MemoryPool* pool_; std::vector> batches_; std::mutex consume_mutex_; @@ -298,8 +297,8 @@ static Result MakeTableConsumingSinkNode( RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); const auto& sink_options = checked_cast(options); MemoryPool* pool = plan->exec_context()->memory_pool(); - auto tb_consumer = std::make_shared( - sink_options.output_table, sink_options.output_schema, pool); + auto tb_consumer = + std::make_shared(sink_options.output_table, pool); auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer}; return MakeExecNode("consuming_sink", plan, inputs, consuming_sink_node_options); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 4d5e8e59dc8..b4c764ca41d 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -271,18 +271,16 @@ namespace { class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { public: - DatasetWritingSinkNodeConsumer(std::shared_ptr schema, - std::unique_ptr dataset_writer, + DatasetWritingSinkNodeConsumer(std::unique_ptr dataset_writer, FileSystemDatasetWriteOptions write_options, std::shared_ptr backpressure_toggle) - : schema_(std::move(schema)), - dataset_writer_(std::move(dataset_writer)), + : dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)), backpressure_toggle_(std::move(backpressure_toggle)) {} - Status Consume(compute::ExecBatch batch) { + Status Consume(compute::ExecBatch batch, const std::shared_ptr& schema) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, - batch.ToRecordBatch(schema_)); + batch.ToRecordBatch(schema)); return WriteNextBatch(std::move(record_batch), batch.guarantee); } @@ -361,9 +359,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio {"filter", compute::FilterNodeOptions{scanner->options()->filter}}, {"project", compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"write", - WriteNodeOptions{write_options, scanner->options()->projected_schema, - backpressure_toggle}}, + {"write", WriteNodeOptions{write_options, backpressure_toggle}}, }) .AddToPlan(plan.get())); @@ -382,7 +378,6 @@ Result MakeWriteNode(compute::ExecPlan* plan, const WriteNodeOptions write_node_options = checked_cast(options); const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; - const std::shared_ptr& schema = write_node_options.schema; const std::shared_ptr& backpressure_toggle = write_node_options.backpressure_toggle; @@ -391,7 +386,7 @@ Result MakeWriteNode(compute::ExecPlan* plan, std::shared_ptr consumer = std::make_shared( - schema, std::move(dataset_writer), write_options, backpressure_toggle); + std::move(dataset_writer), write_options, backpressure_toggle); ARROW_ASSIGN_OR_RAISE( auto node, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 07b156778f6..f0e5f1fa9c3 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -408,14 +408,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { class ARROW_DS_EXPORT WriteNodeOptions : public compute::ExecNodeOptions { public: explicit WriteNodeOptions( - FileSystemDatasetWriteOptions options, std::shared_ptr schema, + FileSystemDatasetWriteOptions options, std::shared_ptr backpressure_toggle = NULLPTR) : write_options(std::move(options)), - schema(std::move(schema)), backpressure_toggle(std::move(backpressure_toggle)) {} FileSystemDatasetWriteOptions write_options; - std::shared_ptr schema; std::shared_ptr backpressure_toggle; }; From e8c6ce0579507f74d8ac9f99be47100f2d2c7229 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 28 Mar 2022 16:00:38 -1000 Subject: [PATCH 2/7] ARROW-16033: Moved to a Init/Consume/Finish model instead of supplying schema to consume. Added back in support for custom metadata which is needed for python tests to pass. --- .../arrow/engine_substrait_consumption.cc | 7 ++- .../execution_plan_documentation_examples.cc | 7 ++- cpp/src/arrow/compute/exec/options.h | 8 ++- cpp/src/arrow/compute/exec/plan_test.cc | 60 ++++++++----------- cpp/src/arrow/compute/exec/sink_node.cc | 15 +++-- cpp/src/arrow/dataset/file_base.cc | 35 ++++++++--- cpp/src/arrow/dataset/file_base.h | 6 ++ 7 files changed, 86 insertions(+), 52 deletions(-) diff --git a/cpp/examples/arrow/engine_substrait_consumption.cc b/cpp/examples/arrow/engine_substrait_consumption.cc index fe82949ef11..d74f6749650 100644 --- a/cpp/examples/arrow/engine_substrait_consumption.cc +++ b/cpp/examples/arrow/engine_substrait_consumption.cc @@ -40,8 +40,11 @@ class IgnoringConsumer : public cp::SinkNodeConsumer { public: explicit IgnoringConsumer(size_t tag) : tag_{tag} {} - arrow::Status Consume(cp::ExecBatch batch, - const std::shared_ptr& schema) override { + arrow::Status Init(const std::shared_ptr& schema) override { + return arrow::Status::OK(); + } + + arrow::Status Consume(cp::ExecBatch batch) override { // Consume a batch of data // (just print its row count to stdout) std::cout << "-" << tag_ << " consumed " << batch.length << " rows" << std::endl; diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 2bd20255ae3..0505af223ed 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -591,8 +591,11 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) { CustomSinkNodeConsumer(std::atomic* batches_seen, arrow::Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} - arrow::Status Consume(cp::ExecBatch batch, - const std::shared_ptr& schema) override { + arrow::Status Init(const std::shared_ptr& schema) override { + return arrow::Status::OK(); + } + + arrow::Status Consume(cp::ExecBatch batch) override { (*batches_seen)++; return arrow::Status::OK(); } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index f0b3344b374..2ed4972dc7c 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -145,8 +145,14 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { class ARROW_EXPORT SinkNodeConsumer { public: virtual ~SinkNodeConsumer() = default; + /// \brief Prepare any consumer state + /// + /// This will be run once the schema is finalized as the plan is starting and + /// before any calls to Consume. A common use is to save off the schema so that + /// batches can be interpreted. + virtual Status Init(const std::shared_ptr& schema) = 0; /// \brief Consume a batch of data - virtual Status Consume(ExecBatch batch, const std::shared_ptr& schema) = 0; + virtual Status Consume(ExecBatch batch) = 0; /// \brief Signal to the consumer that the last batch has been delivered /// /// The returned future should only finish when all outstanding tasks have completed diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index a446d987f97..e30a9562ada 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -493,7 +493,11 @@ TEST(ExecPlanExecution, SourceConsumingSink) { TestConsumer(std::atomic* batches_seen, Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} - Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + Status Init(const std::shared_ptr& schema) override { + return Status::OK(); + } + + Status Consume(ExecBatch batch) override { (*batches_seen)++; return Status::OK(); } @@ -560,20 +564,26 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { } TEST(ExecPlanExecution, ConsumingSinkError) { - struct ConsumeErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + struct InitErrorConsumer : public SinkNodeConsumer { + Status Init(const std::shared_ptr& schema) override { return Status::Invalid("XYZ"); } + Status Consume(ExecBatch batch) override { return Status::OK(); } + Future<> Finish() override { return Future<>::MakeFinished(); } + }; + struct ConsumeErrorConsumer : public SinkNodeConsumer { + Status Init(const std::shared_ptr& schema) override { return Status::OK(); } + Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); } Future<> Finish() override { return Future<>::MakeFinished(); } }; struct FinishErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { - return Status::OK(); - } + Status Init(const std::shared_ptr& schema) override { return Status::OK(); } + Status Consume(ExecBatch batch) override { return Status::OK(); } Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } }; std::vector> consumers{ - std::make_shared(), std::make_shared()}; + std::make_shared(), std::make_shared(), + std::make_shared()}; for (auto& consumer : consumers) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); @@ -589,35 +599,15 @@ TEST(ExecPlanExecution, ConsumingSinkError) { SourceNodeOptions(basic_data.schema, basic_data.gen(false, false)))); ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source}, ConsumingSinkNodeOptions(consumer))); - ASSERT_OK(plan->StartProducing()); - ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); - } -} - -TEST(ExecPlanExecution, ConsumingSinkErrorFinish) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - struct FinishErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { - return Status::OK(); + // If we fail at init we see it during StartProducing. Other + // failures are not seen until we start running. + if (std::dynamic_pointer_cast(consumer)) { + ASSERT_RAISES(Invalid, plan->StartProducing()); + } else { + ASSERT_OK(plan->StartProducing()); + ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); } - Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } - }; - std::shared_ptr consumer = std::make_shared(); - - auto basic_data = MakeBasicBatches(); - ASSERT_OK( - Declaration::Sequence( - {{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))}, - {"consuming_sink", ConsumingSinkNodeOptions(consumer)}}) - .AddToPlan(plan.get())); - ASSERT_OK_AND_ASSIGN( - auto source, - MakeExecNode("source", plan.get(), {}, - SourceNodeOptions(basic_data.schema, basic_data.gen(false, false)))); - ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source}, - ConsumingSinkNodeOptions(consumer))); - ASSERT_OK(plan->StartProducing()); - ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); + } } TEST(ExecPlanExecution, StressSourceSink) { diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index c2291fe21b8..e981de38996 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -178,6 +178,8 @@ class ConsumingSinkNode : public ExecNode { {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); + DCHECK_GT(inputs_.size(), 0); + RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); @@ -213,8 +215,7 @@ class ConsumingSinkNode : public ExecNode { return; } - Status consumption_status = - consumer_->Consume(std::move(batch), input->output_schema()); + Status consumption_status = consumer_->Consume(std::move(batch)); if (!consumption_status.ok()) { if (input_counter_.Cancel()) { Finish(std::move(consumption_status)); @@ -272,9 +273,14 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { TableSinkNodeConsumer(std::shared_ptr
* out, MemoryPool* pool) : out_(out), pool_(pool) {} - Status Consume(ExecBatch batch, const std::shared_ptr& schema) override { + Status Init(const std::shared_ptr& schema) override { + schema_ = schema; + return Status::OK(); + } + + Status Consume(ExecBatch batch) override { std::lock_guard guard(consume_mutex_); - ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema, pool_)); + ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema_, pool_)); batches_.push_back(rb); return Status::OK(); } @@ -287,6 +293,7 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { private: std::shared_ptr
* out_; MemoryPool* pool_; + std::shared_ptr schema_; std::vector> batches_; std::mutex consume_mutex_; }; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index b4c764ca41d..4c66053e5be 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -271,20 +271,31 @@ namespace { class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { public: - DatasetWritingSinkNodeConsumer(std::unique_ptr dataset_writer, + DatasetWritingSinkNodeConsumer(std::shared_ptr custom_metadata, + std::unique_ptr dataset_writer, FileSystemDatasetWriteOptions write_options, std::shared_ptr backpressure_toggle) - : dataset_writer_(std::move(dataset_writer)), + : custom_metadata_(std::move(custom_metadata)), + dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)), backpressure_toggle_(std::move(backpressure_toggle)) {} - Status Consume(compute::ExecBatch batch, const std::shared_ptr& schema) { + Status Init(const std::shared_ptr& schema) override { + if (custom_metadata_) { + schema_ = schema->WithMetadata(custom_metadata_); + } else { + schema_ = schema; + } + return Status::OK(); + } + + Status Consume(compute::ExecBatch batch) override { ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, - batch.ToRecordBatch(schema)); + batch.ToRecordBatch(schema_)); return WriteNextBatch(std::move(record_batch), batch.guarantee); } - Future<> Finish() { + Future<> Finish() override { RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); })); return task_group_.End(); } @@ -325,11 +336,12 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { return Status::OK(); } - std::shared_ptr schema_; + std::shared_ptr custom_metadata_; std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; std::shared_ptr backpressure_toggle_; util::SerializedAsyncTaskGroup task_group_; + std::shared_ptr schema_ = nullptr; }; } // namespace @@ -352,6 +364,10 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio std::shared_ptr backpressure_toggle = std::make_shared(); + // The projected_schema is currently used by pyarrow to preserve the custom metadata + // when reading from a single input file. + const auto& custom_metadata = scanner->options()->projected_schema->metadata(); + RETURN_NOT_OK( compute::Declaration::Sequence( { @@ -359,7 +375,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio {"filter", compute::FilterNodeOptions{scanner->options()->filter}}, {"project", compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"write", WriteNodeOptions{write_options, backpressure_toggle}}, + {"write", + WriteNodeOptions{write_options, custom_metadata, backpressure_toggle}}, }) .AddToPlan(plan.get())); @@ -377,6 +394,8 @@ Result MakeWriteNode(compute::ExecPlan* plan, const WriteNodeOptions write_node_options = checked_cast(options); + const std::shared_ptr& custom_metadata = + write_node_options.custom_metadata; const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; const std::shared_ptr& backpressure_toggle = write_node_options.backpressure_toggle; @@ -386,7 +405,7 @@ Result MakeWriteNode(compute::ExecPlan* plan, std::shared_ptr consumer = std::make_shared( - std::move(dataset_writer), write_options, backpressure_toggle); + custom_metadata, std::move(dataset_writer), write_options, backpressure_toggle); ARROW_ASSIGN_OR_RAISE( auto node, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f0e5f1fa9c3..ca8b7e6450f 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -409,11 +409,17 @@ class ARROW_DS_EXPORT WriteNodeOptions : public compute::ExecNodeOptions { public: explicit WriteNodeOptions( FileSystemDatasetWriteOptions options, + std::shared_ptr custom_metadata = NULLPTR, std::shared_ptr backpressure_toggle = NULLPTR) : write_options(std::move(options)), + custom_metadata(std::move(custom_metadata)), backpressure_toggle(std::move(backpressure_toggle)) {} + /// \brief Options to control how to write the dataset FileSystemDatasetWriteOptions write_options; + /// \brief Optional metadata to attach to written batches + std::shared_ptr custom_metadata; + /// \brief Optional toggle that can be used to pause producers when the node is full std::shared_ptr backpressure_toggle; }; From 7bef054c569b8097e048dac479424e319c2abc38 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 29 Mar 2022 10:40:08 -1000 Subject: [PATCH 3/7] ARROW-16033: Removed unused schema argument from TableSinkNodeOptions --- cpp/src/arrow/compute/exec/options.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 2ed4972dc7c..38b1cf637f7 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -314,8 +314,7 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: - TableSinkNodeOptions(std::shared_ptr
* output_table, - std::shared_ptr output_schema) + TableSinkNodeOptions(std::shared_ptr
* output_table) : output_table(output_table) {} std::shared_ptr
* output_table; From da90068ddf117f409f5be67cf48ec4959c5cee97 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 29 Mar 2022 14:55:25 -1000 Subject: [PATCH 4/7] ARROW-16033: Lint --- cpp/src/arrow/compute/exec/options.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 38b1cf637f7..259e467d97e 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -314,7 +314,7 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: - TableSinkNodeOptions(std::shared_ptr
* output_table) + explicit TableSinkNodeOptions(std::shared_ptr
* output_table) : output_table(output_table) {} std::shared_ptr
* output_table; From a0b1f46efb4fbd3814d2f0760280092e9deb336e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 29 Mar 2022 15:20:35 -1000 Subject: [PATCH 5/7] ARROW-16033: Forgot to update example --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index e30a9562ada..615dec33fa0 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -543,7 +543,7 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { auto basic_data = MakeBasicBatches(); - TableSinkNodeOptions options{&out, basic_data.schema}; + TableSinkNodeOptions options{&out}; ASSERT_OK_AND_ASSIGN( auto source, MakeExecNode("source", plan.get(), {}, From 9ad85ce2e2213cd2f4a55094ea52d3cc98b3a29a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 29 Mar 2022 17:31:21 -1000 Subject: [PATCH 6/7] ARROW-16033: Update R's consuming sink to the new model --- r/R/arrowExports.R | 5 ++-- r/R/query-engine.R | 4 ++-- r/src/arrowExports.cpp | 23 +++++++++---------- r/src/compute-exec.cpp | 34 ++++++++-------------------- r/tests/testthat/test-query-engine.R | 2 +- 5 files changed, 26 insertions(+), 42 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c07a66e78d3..f43ef730caa 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -452,8 +452,8 @@ engine__internal__SubstraitFromJSON <- function(substrait_json) { .Call(`_arrow_engine__internal__SubstraitFromJSON`, substrait_json) } -ExecPlan_run_substrait <- function(plan, serialized_plan, out_names) { - .Call(`_arrow_ExecPlan_run_substrait`, plan, serialized_plan, out_names) +ExecPlan_run_substrait <- function(plan, serialized_plan) { + .Call(`_arrow_ExecPlan_run_substrait`, plan, serialized_plan) } RecordBatch__cast <- function(batch, schema, options) { @@ -1943,3 +1943,4 @@ SetIOThreadPoolCapacity <- function(threads) { Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } + diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 429ee2f50e9..6c1b14036f1 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -305,7 +305,7 @@ ExecNode <- R6Class("ExecNode", ) ) -do_exec_plan_substrait <- function(substrait_plan, output_names) { +do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- engine__internal__SubstraitFromJSON(substrait_plan) } else if (is.raw(substrait_plan)) { @@ -315,5 +315,5 @@ do_exec_plan_substrait <- function(substrait_plan, output_names) { } plan <- ExecPlan$create() - ExecPlan_run_substrait(plan, substrait_plan, output_names) + ExecPlan_run_substrait(plan, substrait_plan) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 47a65190400..45a883321da 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1781,17 +1781,16 @@ extern "C" SEXP _arrow_engine__internal__SubstraitFromJSON(SEXP substrait_json_s // compute-exec.cpp #if defined(ARROW_R_WITH_ENGINE) -std::shared_ptr ExecPlan_run_substrait(const std::shared_ptr& plan, const std::shared_ptr& serialized_plan, cpp11::strings out_names); -extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ +std::shared_ptr ExecPlan_run_substrait(const std::shared_ptr& plan, const std::shared_ptr& serialized_plan); +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type serialized_plan(serialized_plan_sexp); - arrow::r::Input::type out_names(out_names_sexp); - return cpp11::as_sexp(ExecPlan_run_substrait(plan, serialized_plan, out_names)); + return cpp11::as_sexp(ExecPlan_run_substrait(plan, serialized_plan)); END_CPP11 } #else -extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp){ Rf_error("Cannot call ExecPlan_run_substrait(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7708,12 +7707,12 @@ return Rf_ScalarLogical( ); } static const R_CallMethodDef CallEntries[] = { - { "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, - { "_dataset_available", (DL_FUNC)& _dataset_available, 0 }, - { "_engine_available", (DL_FUNC)& _engine_available, 0 }, - { "_parquet_available", (DL_FUNC)& _parquet_available, 0 }, - { "_s3_available", (DL_FUNC)& _s3_available, 0 }, - { "_json_available", (DL_FUNC)& _json_available, 0 }, +{ "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, +{ "_dataset_available", (DL_FUNC)& _dataset_available, 0 }, +{ "_engine_available", (DL_FUNC)& _engine_available, 0 }, +{ "_parquet_available", (DL_FUNC)& _parquet_available, 0 }, +{ "_s3_available", (DL_FUNC)& _s3_available, 0 }, +{ "_json_available", (DL_FUNC)& _json_available, 0 }, { "_arrow_test_SET_STRING_ELT", (DL_FUNC) &_arrow_test_SET_STRING_ELT, 1}, { "_arrow_is_arrow_altrep", (DL_FUNC) &_arrow_is_arrow_altrep, 1}, { "_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2}, @@ -7827,7 +7826,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecNode_TableSourceNode", (DL_FUNC) &_arrow_ExecNode_TableSourceNode, 2}, { "_arrow_engine__internal__SubstraitToJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitToJSON, 1}, { "_arrow_engine__internal__SubstraitFromJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitFromJSON, 1}, - { "_arrow_ExecPlan_run_substrait", (DL_FUNC) &_arrow_ExecPlan_run_substrait, 3}, + { "_arrow_ExecPlan_run_substrait", (DL_FUNC) &_arrow_ExecPlan_run_substrait, 2}, { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0e52c0a3981..a1a679144da 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -299,27 +299,15 @@ std::shared_ptr ExecNode_TableSourceNode( // a RecordBatchReader output (ARROW-15849) class AccumulatingConsumer : public compute::SinkNodeConsumer { public: - explicit AccumulatingConsumer(const std::vector& schema_names) - : schema_names_(schema_names) {} - const std::vector>& batches() { return batches_; } - arrow::Status Consume(compute::ExecBatch batch) override { - arrow::SchemaBuilder builder; - auto descriptors = batch.GetDescriptors(); - for (int64_t i = 0; i < schema_names_.size(); i++) { - if (i == (descriptors.size() - 1)) { - break; - } - - RETURN_NOT_OK(builder.AddField( - std::make_shared(schema_names_[i], descriptors[i].type))); - } - - auto schema = builder.Finish(); - RETURN_NOT_OK(schema); + arrow::Status Init(const std::shared_ptr& schema) { + schema_ = schema; + return arrow::Status::OK(); + } - auto record_batch = batch.ToRecordBatch(schema.ValueUnsafe()); + arrow::Status Consume(compute::ExecBatch batch) override { + auto record_batch = batch.ToRecordBatch(schema_); ARROW_RETURN_NOT_OK(record_batch); batches_.push_back(record_batch.ValueUnsafe()); @@ -329,7 +317,7 @@ class AccumulatingConsumer : public compute::SinkNodeConsumer { arrow::Future<> Finish() override { return arrow::Future<>::MakeFinished(); } private: - std::vector schema_names_; + std::shared_ptr schema_; std::vector> batches_; }; @@ -350,15 +338,11 @@ std::shared_ptr engine__internal__SubstraitFromJSON( // [[engine::export]] std::shared_ptr ExecPlan_run_substrait( const std::shared_ptr& plan, - const std::shared_ptr& serialized_plan, cpp11::strings out_names) { + const std::shared_ptr& serialized_plan) { std::vector> consumers; - std::vector out_names_string; - for (const auto& item : out_names) { - out_names_string.push_back(item); - } std::function()> consumer_factory = [&] { - consumers.emplace_back(new AccumulatingConsumer(out_names_string)); + consumers.emplace_back(new AccumulatingConsumer()); return consumers.back(); }; diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 8293267d7cc..55372742504 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -55,7 +55,7 @@ test_that("do_exec_plan_substrait can evaluate a simple plan", { substrait_json_roundtrip <- engine__internal__SubstraitToJSON(substrait_buffer) expect_match(substrait_json_roundtrip, tf, fixed = TRUE) - result <- do_exec_plan_substrait(substrait_json, names(df)) + result <- do_exec_plan_substrait(substrait_json) expect_identical( tibble::as_tibble(result), tibble::as_tibble(df) From aad80397f79f542e09360ead42e5e31ecb6259a0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 4 Apr 2022 14:57:44 -1000 Subject: [PATCH 7/7] ARROW-16033: Add workaround for ARROW-15585 exposed by this change --- r/tests/testthat/test-query-engine.R | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 55372742504..3ef2d3aa1cb 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +library(dplyr, warn.conflicts = FALSE) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("engine") @@ -57,7 +59,8 @@ test_that("do_exec_plan_substrait can evaluate a simple plan", { result <- do_exec_plan_substrait(substrait_json) expect_identical( - tibble::as_tibble(result), + # TODO(ARROW-15585) The "select(i, b)" should not be needed + tibble::as_tibble(result) %>% select(i, b), tibble::as_tibble(df) ) })