diff --git a/cpp/examples/arrow/engine_substrait_consumption.cc b/cpp/examples/arrow/engine_substrait_consumption.cc index b0109b36888..d74f6749650 100644 --- a/cpp/examples/arrow/engine_substrait_consumption.cc +++ b/cpp/examples/arrow/engine_substrait_consumption.cc @@ -40,6 +40,10 @@ class IgnoringConsumer : public cp::SinkNodeConsumer { public: explicit IgnoringConsumer(size_t tag) : tag_{tag} {} + arrow::Status Init(const std::shared_ptr& schema) override { + return arrow::Status::OK(); + } + arrow::Status Consume(cp::ExecBatch batch) override { // Consume a batch of data // (just print its row count to stdout) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 81cdcef5301..0505af223ed 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -591,6 +591,10 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) { CustomSinkNodeConsumer(std::atomic* batches_seen, arrow::Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} + arrow::Status Init(const std::shared_ptr& schema) override { + return arrow::Status::OK(); + } + arrow::Status Consume(cp::ExecBatch batch) override { (*batches_seen)++; return arrow::Status::OK(); @@ -794,7 +798,7 @@ arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context, write_options.partitioning = partitioning; write_options.basename_template = "part{i}.parquet"; - arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()}; + arrow::dataset::WriteNodeOptions write_node_options{write_options}; ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options)); diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index d5780753254..259e467d97e 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -145,6 +145,12 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { class ARROW_EXPORT SinkNodeConsumer { public: virtual ~SinkNodeConsumer() = default; + /// \brief Prepare any consumer state + /// + /// This will be run once the schema is finalized as the plan is starting and + /// before any calls to Consume. A common use is to save off the schema so that + /// batches can be interpreted. + virtual Status Init(const std::shared_ptr& schema) = 0; /// \brief Consume a batch of data virtual Status Consume(ExecBatch batch) = 0; /// \brief Signal to the consumer that the last batch has been delivered @@ -308,12 +314,10 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { /// a table pointer. class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: - TableSinkNodeOptions(std::shared_ptr* output_table, - std::shared_ptr output_schema) - : output_table(output_table), output_schema(std::move(output_schema)) {} + explicit TableSinkNodeOptions(std::shared_ptr
* output_table) + : output_table(output_table) {} std::shared_ptr
* output_table; - std::shared_ptr output_schema; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index e176c701b65..615dec33fa0 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -493,6 +493,10 @@ TEST(ExecPlanExecution, SourceConsumingSink) { TestConsumer(std::atomic* batches_seen, Future<> finish) : batches_seen(batches_seen), finish(std::move(finish)) {} + Status Init(const std::shared_ptr& schema) override { + return Status::OK(); + } + Status Consume(ExecBatch batch) override { (*batches_seen)++; return Status::OK(); @@ -539,7 +543,7 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { auto basic_data = MakeBasicBatches(); - TableSinkNodeOptions options{&out, basic_data.schema}; + TableSinkNodeOptions options{&out}; ASSERT_OK_AND_ASSIGN( auto source, MakeExecNode("source", plan.get(), {}, @@ -560,16 +564,26 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { } TEST(ExecPlanExecution, ConsumingSinkError) { + struct InitErrorConsumer : public SinkNodeConsumer { + Status Init(const std::shared_ptr& schema) override { + return Status::Invalid("XYZ"); + } + Status Consume(ExecBatch batch) override { return Status::OK(); } + Future<> Finish() override { return Future<>::MakeFinished(); } + }; struct ConsumeErrorConsumer : public SinkNodeConsumer { + Status Init(const std::shared_ptr& schema) override { return Status::OK(); } Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); } Future<> Finish() override { return Future<>::MakeFinished(); } }; struct FinishErrorConsumer : public SinkNodeConsumer { + Status Init(const std::shared_ptr& schema) override { return Status::OK(); } Status Consume(ExecBatch batch) override { return Status::OK(); } Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } }; std::vector> consumers{ - std::make_shared(), std::make_shared()}; + std::make_shared(), std::make_shared(), + std::make_shared()}; for (auto& consumer : consumers) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); @@ -585,35 +599,17 @@ TEST(ExecPlanExecution, ConsumingSinkError) { SourceNodeOptions(basic_data.schema, basic_data.gen(false, false)))); ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source}, ConsumingSinkNodeOptions(consumer))); - ASSERT_OK(plan->StartProducing()); - ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); + // If we fail at init we see it during StartProducing. Other + // failures are not seen until we start running. + if (std::dynamic_pointer_cast(consumer)) { + ASSERT_RAISES(Invalid, plan->StartProducing()); + } else { + ASSERT_OK(plan->StartProducing()); + ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); + } } } -TEST(ExecPlanExecution, ConsumingSinkErrorFinish) { - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - struct FinishErrorConsumer : public SinkNodeConsumer { - Status Consume(ExecBatch batch) override { return Status::OK(); } - Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); } - }; - std::shared_ptr consumer = std::make_shared(); - - auto basic_data = MakeBasicBatches(); - ASSERT_OK( - Declaration::Sequence( - {{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))}, - {"consuming_sink", ConsumingSinkNodeOptions(consumer)}}) - .AddToPlan(plan.get())); - ASSERT_OK_AND_ASSIGN( - auto source, - MakeExecNode("source", plan.get(), {}, - SourceNodeOptions(basic_data.schema, basic_data.gen(false, false)))); - ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source}, - ConsumingSinkNodeOptions(consumer))); - ASSERT_OK(plan->StartProducing()); - ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished()); -} - TEST(ExecPlanExecution, StressSourceSink) { for (bool slow : {false, true}) { SCOPED_TRACE(slow ? "slowed" : "unslowed"); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 13564c736b5..e981de38996 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -178,6 +178,8 @@ class ConsumingSinkNode : public ExecNode { {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); + DCHECK_GT(inputs_.size(), 0); + RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); @@ -268,13 +270,17 @@ class ConsumingSinkNode : public ExecNode { struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { public: - TableSinkNodeConsumer(std::shared_ptr
* out, - std::shared_ptr output_schema, MemoryPool* pool) - : out_(out), output_schema_(std::move(output_schema)), pool_(pool) {} + TableSinkNodeConsumer(std::shared_ptr
* out, MemoryPool* pool) + : out_(out), pool_(pool) {} + + Status Init(const std::shared_ptr& schema) override { + schema_ = schema; + return Status::OK(); + } Status Consume(ExecBatch batch) override { std::lock_guard guard(consume_mutex_); - ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_)); + ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema_, pool_)); batches_.push_back(rb); return Status::OK(); } @@ -286,8 +292,8 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer { private: std::shared_ptr
* out_; - std::shared_ptr output_schema_; MemoryPool* pool_; + std::shared_ptr schema_; std::vector> batches_; std::mutex consume_mutex_; }; @@ -298,8 +304,8 @@ static Result MakeTableConsumingSinkNode( RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); const auto& sink_options = checked_cast(options); MemoryPool* pool = plan->exec_context()->memory_pool(); - auto tb_consumer = std::make_shared( - sink_options.output_table, sink_options.output_schema, pool); + auto tb_consumer = + std::make_shared(sink_options.output_table, pool); auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer}; return MakeExecNode("consuming_sink", plan, inputs, consuming_sink_node_options); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 4d5e8e59dc8..4c66053e5be 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -271,22 +271,31 @@ namespace { class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { public: - DatasetWritingSinkNodeConsumer(std::shared_ptr schema, + DatasetWritingSinkNodeConsumer(std::shared_ptr custom_metadata, std::unique_ptr dataset_writer, FileSystemDatasetWriteOptions write_options, std::shared_ptr backpressure_toggle) - : schema_(std::move(schema)), + : custom_metadata_(std::move(custom_metadata)), dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)), backpressure_toggle_(std::move(backpressure_toggle)) {} - Status Consume(compute::ExecBatch batch) { + Status Init(const std::shared_ptr& schema) override { + if (custom_metadata_) { + schema_ = schema->WithMetadata(custom_metadata_); + } else { + schema_ = schema; + } + return Status::OK(); + } + + Status Consume(compute::ExecBatch batch) override { ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, batch.ToRecordBatch(schema_)); return WriteNextBatch(std::move(record_batch), batch.guarantee); } - Future<> Finish() { + Future<> Finish() override { RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); })); return task_group_.End(); } @@ -327,11 +336,12 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { return Status::OK(); } - std::shared_ptr schema_; + std::shared_ptr custom_metadata_; std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; std::shared_ptr backpressure_toggle_; util::SerializedAsyncTaskGroup task_group_; + std::shared_ptr schema_ = nullptr; }; } // namespace @@ -354,6 +364,10 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio std::shared_ptr backpressure_toggle = std::make_shared(); + // The projected_schema is currently used by pyarrow to preserve the custom metadata + // when reading from a single input file. + const auto& custom_metadata = scanner->options()->projected_schema->metadata(); + RETURN_NOT_OK( compute::Declaration::Sequence( { @@ -362,8 +376,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio {"project", compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, {"write", - WriteNodeOptions{write_options, scanner->options()->projected_schema, - backpressure_toggle}}, + WriteNodeOptions{write_options, custom_metadata, backpressure_toggle}}, }) .AddToPlan(plan.get())); @@ -381,8 +394,9 @@ Result MakeWriteNode(compute::ExecPlan* plan, const WriteNodeOptions write_node_options = checked_cast(options); + const std::shared_ptr& custom_metadata = + write_node_options.custom_metadata; const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; - const std::shared_ptr& schema = write_node_options.schema; const std::shared_ptr& backpressure_toggle = write_node_options.backpressure_toggle; @@ -391,7 +405,7 @@ Result MakeWriteNode(compute::ExecPlan* plan, std::shared_ptr consumer = std::make_shared( - schema, std::move(dataset_writer), write_options, backpressure_toggle); + custom_metadata, std::move(dataset_writer), write_options, backpressure_toggle); ARROW_ASSIGN_OR_RAISE( auto node, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 07b156778f6..ca8b7e6450f 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -408,14 +408,18 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { class ARROW_DS_EXPORT WriteNodeOptions : public compute::ExecNodeOptions { public: explicit WriteNodeOptions( - FileSystemDatasetWriteOptions options, std::shared_ptr schema, + FileSystemDatasetWriteOptions options, + std::shared_ptr custom_metadata = NULLPTR, std::shared_ptr backpressure_toggle = NULLPTR) : write_options(std::move(options)), - schema(std::move(schema)), + custom_metadata(std::move(custom_metadata)), backpressure_toggle(std::move(backpressure_toggle)) {} + /// \brief Options to control how to write the dataset FileSystemDatasetWriteOptions write_options; - std::shared_ptr schema; + /// \brief Optional metadata to attach to written batches + std::shared_ptr custom_metadata; + /// \brief Optional toggle that can be used to pause producers when the node is full std::shared_ptr backpressure_toggle; }; diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c07a66e78d3..f43ef730caa 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -452,8 +452,8 @@ engine__internal__SubstraitFromJSON <- function(substrait_json) { .Call(`_arrow_engine__internal__SubstraitFromJSON`, substrait_json) } -ExecPlan_run_substrait <- function(plan, serialized_plan, out_names) { - .Call(`_arrow_ExecPlan_run_substrait`, plan, serialized_plan, out_names) +ExecPlan_run_substrait <- function(plan, serialized_plan) { + .Call(`_arrow_ExecPlan_run_substrait`, plan, serialized_plan) } RecordBatch__cast <- function(batch, schema, options) { @@ -1943,3 +1943,4 @@ SetIOThreadPoolCapacity <- function(threads) { Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } + diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 429ee2f50e9..6c1b14036f1 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -305,7 +305,7 @@ ExecNode <- R6Class("ExecNode", ) ) -do_exec_plan_substrait <- function(substrait_plan, output_names) { +do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- engine__internal__SubstraitFromJSON(substrait_plan) } else if (is.raw(substrait_plan)) { @@ -315,5 +315,5 @@ do_exec_plan_substrait <- function(substrait_plan, output_names) { } plan <- ExecPlan$create() - ExecPlan_run_substrait(plan, substrait_plan, output_names) + ExecPlan_run_substrait(plan, substrait_plan) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 47a65190400..45a883321da 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1781,17 +1781,16 @@ extern "C" SEXP _arrow_engine__internal__SubstraitFromJSON(SEXP substrait_json_s // compute-exec.cpp #if defined(ARROW_R_WITH_ENGINE) -std::shared_ptr ExecPlan_run_substrait(const std::shared_ptr& plan, const std::shared_ptr& serialized_plan, cpp11::strings out_names); -extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ +std::shared_ptr ExecPlan_run_substrait(const std::shared_ptr& plan, const std::shared_ptr& serialized_plan); +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type serialized_plan(serialized_plan_sexp); - arrow::r::Input::type out_names(out_names_sexp); - return cpp11::as_sexp(ExecPlan_run_substrait(plan, serialized_plan, out_names)); + return cpp11::as_sexp(ExecPlan_run_substrait(plan, serialized_plan)); END_CPP11 } #else -extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp, SEXP out_names_sexp){ +extern "C" SEXP _arrow_ExecPlan_run_substrait(SEXP plan_sexp, SEXP serialized_plan_sexp){ Rf_error("Cannot call ExecPlan_run_substrait(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7708,12 +7707,12 @@ return Rf_ScalarLogical( ); } static const R_CallMethodDef CallEntries[] = { - { "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, - { "_dataset_available", (DL_FUNC)& _dataset_available, 0 }, - { "_engine_available", (DL_FUNC)& _engine_available, 0 }, - { "_parquet_available", (DL_FUNC)& _parquet_available, 0 }, - { "_s3_available", (DL_FUNC)& _s3_available, 0 }, - { "_json_available", (DL_FUNC)& _json_available, 0 }, +{ "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, +{ "_dataset_available", (DL_FUNC)& _dataset_available, 0 }, +{ "_engine_available", (DL_FUNC)& _engine_available, 0 }, +{ "_parquet_available", (DL_FUNC)& _parquet_available, 0 }, +{ "_s3_available", (DL_FUNC)& _s3_available, 0 }, +{ "_json_available", (DL_FUNC)& _json_available, 0 }, { "_arrow_test_SET_STRING_ELT", (DL_FUNC) &_arrow_test_SET_STRING_ELT, 1}, { "_arrow_is_arrow_altrep", (DL_FUNC) &_arrow_is_arrow_altrep, 1}, { "_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2}, @@ -7827,7 +7826,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecNode_TableSourceNode", (DL_FUNC) &_arrow_ExecNode_TableSourceNode, 2}, { "_arrow_engine__internal__SubstraitToJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitToJSON, 1}, { "_arrow_engine__internal__SubstraitFromJSON", (DL_FUNC) &_arrow_engine__internal__SubstraitFromJSON, 1}, - { "_arrow_ExecPlan_run_substrait", (DL_FUNC) &_arrow_ExecPlan_run_substrait, 3}, + { "_arrow_ExecPlan_run_substrait", (DL_FUNC) &_arrow_ExecPlan_run_substrait, 2}, { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0e52c0a3981..a1a679144da 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -299,27 +299,15 @@ std::shared_ptr ExecNode_TableSourceNode( // a RecordBatchReader output (ARROW-15849) class AccumulatingConsumer : public compute::SinkNodeConsumer { public: - explicit AccumulatingConsumer(const std::vector& schema_names) - : schema_names_(schema_names) {} - const std::vector>& batches() { return batches_; } - arrow::Status Consume(compute::ExecBatch batch) override { - arrow::SchemaBuilder builder; - auto descriptors = batch.GetDescriptors(); - for (int64_t i = 0; i < schema_names_.size(); i++) { - if (i == (descriptors.size() - 1)) { - break; - } - - RETURN_NOT_OK(builder.AddField( - std::make_shared(schema_names_[i], descriptors[i].type))); - } - - auto schema = builder.Finish(); - RETURN_NOT_OK(schema); + arrow::Status Init(const std::shared_ptr& schema) { + schema_ = schema; + return arrow::Status::OK(); + } - auto record_batch = batch.ToRecordBatch(schema.ValueUnsafe()); + arrow::Status Consume(compute::ExecBatch batch) override { + auto record_batch = batch.ToRecordBatch(schema_); ARROW_RETURN_NOT_OK(record_batch); batches_.push_back(record_batch.ValueUnsafe()); @@ -329,7 +317,7 @@ class AccumulatingConsumer : public compute::SinkNodeConsumer { arrow::Future<> Finish() override { return arrow::Future<>::MakeFinished(); } private: - std::vector schema_names_; + std::shared_ptr schema_; std::vector> batches_; }; @@ -350,15 +338,11 @@ std::shared_ptr engine__internal__SubstraitFromJSON( // [[engine::export]] std::shared_ptr ExecPlan_run_substrait( const std::shared_ptr& plan, - const std::shared_ptr& serialized_plan, cpp11::strings out_names) { + const std::shared_ptr& serialized_plan) { std::vector> consumers; - std::vector out_names_string; - for (const auto& item : out_names) { - out_names_string.push_back(item); - } std::function()> consumer_factory = [&] { - consumers.emplace_back(new AccumulatingConsumer(out_names_string)); + consumers.emplace_back(new AccumulatingConsumer()); return consumers.back(); }; diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 8293267d7cc..3ef2d3aa1cb 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +library(dplyr, warn.conflicts = FALSE) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("engine") @@ -55,9 +57,10 @@ test_that("do_exec_plan_substrait can evaluate a simple plan", { substrait_json_roundtrip <- engine__internal__SubstraitToJSON(substrait_buffer) expect_match(substrait_json_roundtrip, tf, fixed = TRUE) - result <- do_exec_plan_substrait(substrait_json, names(df)) + result <- do_exec_plan_substrait(substrait_json) expect_identical( - tibble::as_tibble(result), + # TODO(ARROW-15585) The "select(i, b)" should not be needed + tibble::as_tibble(result) %>% select(i, b), tibble::as_tibble(df) ) })