diff --git a/r/R/query-engine.R b/r/R/query-engine.R index fff2b2cf6c5..8f8514ad1cb 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -18,7 +18,7 @@ do_exec_plan <- function(.data) { plan <- ExecPlan$create() final_node <- plan$Build(.data) - tab <- plan$Run(final_node) + tab <- plan$Run(final_node)$read_table() # If arrange() created $temp_columns, make sure to omit them from the result # We can't currently handle this in the ExecPlan itself because sorting diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index f33b81c08f0..0e6aebeca2c 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1094,7 +1094,7 @@ extern "C" SEXP _arrow_ExecPlan_create(SEXP use_threads_sexp){ // compute-exec.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options); +std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options); extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index ea80c8587b0..9404e016c98 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -55,7 +55,7 @@ std::shared_ptr MakeExecNodeOrStop( } // [[arrow::export]] -std::shared_ptr ExecPlan_run( +std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options) { // For now, don't require R to construct SinkNodes. @@ -77,11 +77,21 @@ std::shared_ptr ExecPlan_run( StopIfNotOk(plan->Validate()); StopIfNotOk(plan->StartProducing()); - std::shared_ptr sink_reader = compute::MakeGeneratorReader( - final_node->output_schema(), std::move(sink_gen), gc_memory_pool()); - - plan->finished().Wait(); - return ValueOrStop(arrow::Table::FromRecordBatchReader(sink_reader.get())); + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + + return compute::MakeGeneratorReader( + final_node->output_schema(), + [stop_producing, plan, sink_gen] { return sink_gen(); }, gc_memory_pool()); } #if defined(ARROW_R_WITH_DATASET)