From f9f696d7df1fd59bd7b4e74237b29951636114c8 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 25 Jul 2022 13:01:34 -0300 Subject: [PATCH 01/34] consolidate exec plan reader into a dedicated reader class --- r/R/arrowExports.R | 4 +- r/R/query-engine.R | 1 + r/src/arrowExports.cpp | 9 +-- r/src/compute-exec.cpp | 157 +++++++++++++++++++++++------------------ 4 files changed, 95 insertions(+), 76 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index ab3358d6664..0c756a8e66c 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -420,8 +420,8 @@ ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } -ExecPlan_BuildAndShow <- function(plan, final_node, sort_options, head) { - .Call(`_arrow_ExecPlan_BuildAndShow`, plan, final_node, sort_options, head) +ExecPlan_BuildAndShow <- function(plan, final_node, sort_options, metadata, head) { + .Call(`_arrow_ExecPlan_BuildAndShow`, plan, final_node, sort_options, metadata, head) } ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { diff --git a/r/R/query-engine.R b/r/R/query-engine.R index c132b291b87..2d2249aba61 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -309,6 +309,7 @@ ExecPlan <- R6Class("ExecPlan", self, node, sorting, + character(), select_k ) }, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index adb6636e9ee..b1df45293ff 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -910,14 +910,15 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head); -extern "C" SEXP _arrow_ExecPlan_BuildAndShow(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP head_sexp){ +std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); +extern "C" SEXP _arrow_ExecPlan_BuildAndShow(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type final_node(final_node_sexp); arrow::r::Input::type sort_options(sort_options_sexp); + arrow::r::Input::type metadata(metadata_sexp); arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_BuildAndShow(plan, final_node, sort_options, head)); + return cpp11::as_sexp(ExecPlan_BuildAndShow(plan, final_node, sort_options, metadata, head)); END_CPP11 } // compute-exec.cpp @@ -5290,7 +5291,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_read_table", (DL_FUNC) &_arrow_ExecPlan_read_table, 5}, { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, - { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 4}, + { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 5}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index abcb418a2c2..a81ad9521ac 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -56,14 +56,86 @@ std::shared_ptr MakeExecNodeOrStop( }); } -std::pair, std::shared_ptr> -ExecPlan_prepare(const std::shared_ptr& plan, - const std::shared_ptr& final_node, - cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - // a section of this code is copied and used in ExecPlan_BuildAndShow - the 2 need - // to be in sync - // Start of chunk used in ExecPlan_BuildAndShow +class ExecPlanReader : public arrow::RecordBatchReader { + public: + ExecPlanReader( + const std::shared_ptr& plan, + const std::shared_ptr& schema, + arrow::AsyncGenerator> sink_gen) + : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(0) {} + + std::shared_ptr schema() const { return schema_; } + + arrow::Status ReadNext(std::shared_ptr* batch_out) { + // TODO(ARROW-11841) check a StopToken to potentially cancel this plan + + // If this is the first batch getting pulled, tell the exec plan to + // start producing + if (status_ == 0) { + ARROW_RETURN_NOT_OK(StartProducing()); + } + + // If we've closed the reader, this is invalid + if (status_ == 2) { + return arrow::Status::Invalid("ExecPlanReader has been closed"); + } + + auto out = sink_gen_().result(); + ARROW_RETURN_NOT_OK(out); + if (out.ValueUnsafe()) { + auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_, gc_memory_pool()); + ARROW_RETURN_NOT_OK(batch_result); + *batch_out = batch_result.ValueUnsafe(); + } else { + batch_out->reset(); + } + + return arrow::Status::OK(); + } + + arrow::Status Close() { + if (status_ == 2) { + return arrow::Status::Invalid("ExecPlanReader has been closed"); + } + + StopProducing(); + return arrow::Status::OK(); + } + const std::shared_ptr& Plan() { return plan_; } + + ~ExecPlanReader() { StopProducing(); } + + private: + std::shared_ptr schema_; + std::shared_ptr plan_; + arrow::AsyncGenerator> sink_gen_; + int status_; + + arrow::Status StartProducing() { + ARROW_RETURN_NOT_OK(plan_->StartProducing()); + status_ = 1; + return arrow::Status::OK(); + } + + void StopProducing() { + if (status_ == 1) { + bool not_finished_yet = + plan_->finished().TryAddCallback([] { return [](const arrow::Status&) {}; }); + + if (not_finished_yet) { + plan_->StopProducing(); + } + + status_ = 2; + } + } +}; + +std::shared_ptr ExecPlan_prepare( + const std::shared_ptr& plan, + const std::shared_ptr& final_node, cpp11::list sort_options, + cpp11::strings metadata, int64_t head = -1) { // For now, don't require R to construct SinkNodes. // Instead, just pass the node we should collect as an argument. arrow::AsyncGenerator> sink_gen; @@ -92,22 +164,8 @@ ExecPlan_prepare(const std::shared_ptr& plan, compute::SinkNodeOptions{&sink_gen}); } - // End of chunk used in ExecPlan_BuildAndShow - StopIfNotOk(plan->Validate()); - // If the generator is destroyed before being completely drained, inform plan - std::shared_ptr stop_producing{nullptr, [plan](...) { - bool not_finished_yet = - plan->finished().TryAddCallback([&plan] { - return [plan](const arrow::Status&) {}; - }); - - if (not_finished_yet) { - plan->StopProducing(); - } - }}; - // Attach metadata to the schema auto out_schema = final_node->output_schema(); if (metadata.size() > 0) { @@ -115,13 +173,7 @@ ExecPlan_prepare(const std::shared_ptr& plan, out_schema = out_schema->WithMetadata(kv); } - std::pair, std::shared_ptr> - out; - out.first = plan; - out.second = compute::MakeGeneratorReader( - out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, - gc_memory_pool()); - return out; + return std::make_shared(plan, out_schema, std::move(sink_gen)); } // [[arrow::export]] @@ -129,9 +181,7 @@ std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - StopIfNotOk(prepared_plan.first->StartProducing()); - return prepared_plan.second; + return ExecPlan_prepare(plan, final_node, sort_options, metadata, head); } // [[arrow::export]] @@ -139,12 +189,10 @@ std::shared_ptr ExecPlan_read_table( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - + auto reader = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); auto result = RunWithCapturedRIfPossible>( [&]() -> arrow::Result> { - ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing()); - return prepared_plan.second->ToTable(); + return reader->ToTable(); }); return ValueOrStop(result); @@ -164,41 +212,10 @@ std::shared_ptr ExecNode_output_schema( // [[arrow::export]] std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, - cpp11::list sort_options, int64_t head = -1) { - // a section of this code is copied from ExecPlan_prepare - the 2 need to be in sync - // Start of chunk copied from ExecPlan_prepare - - // For now, don't require R to construct SinkNodes. - // Instead, just pass the node we should collect as an argument. - arrow::AsyncGenerator> sink_gen; - - // Sorting uses a different sink node; there is no general sort yet - if (sort_options.size() > 0) { - if (head >= 0) { - // Use the SelectK node to take only what we need - MakeExecNodeOrStop( - "select_k_sink", plan.get(), {final_node.get()}, - compute::SelectKSinkNodeOptions{ - arrow::compute::SelectKOptions( - head, std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)) - ->sort_keys), - &sink_gen}); - } else { - MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, - compute::OrderBySinkNodeOptions{ - *std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)), - &sink_gen}); - } - } else { - MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, - compute::SinkNodeOptions{&sink_gen}); - } - - // End of chunk copied from ExecPlan_prepare - - return plan->ToString(); + cpp11::list sort_options, cpp11::strings metadata, + int64_t head = -1) { + auto reader = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); + return reader->Plan()->ToString(); } #if defined(ARROW_R_WITH_DATASET) From dbd2d61545d155c46a2c8ed18c69a61f7d78c5fa Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 25 Jul 2022 14:02:52 -0300 Subject: [PATCH 02/34] fix the head thing --- r/src/recordbatchreader.cpp | 32 +++++++++++++++++++++----------- r/tests/testthat/test-compute.R | 31 ++++++++++--------------------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index c571d282da1..1d7a7299f8d 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -108,22 +108,32 @@ std::shared_ptr RecordBatchReader__from_Table( // [[arrow::export]] std::shared_ptr Table__from_RecordBatchReader( const std::shared_ptr& reader) { - return ValueOrStop(reader->ToTable()); + auto result = RunWithCapturedRIfPossible>( + [&]() { return reader->ToTable(); }); + + return ValueOrStop(result); } // [[arrow::export]] std::shared_ptr RecordBatchReader__Head( const std::shared_ptr& reader, int64_t num_rows) { - std::vector> batches; - std::shared_ptr this_batch; - while (num_rows > 0) { - this_batch = ValueOrStop(reader->Next()); - if (this_batch == nullptr) break; - batches.push_back(this_batch->Slice(0, num_rows)); - num_rows -= this_batch->num_rows(); - } - return ValueOrStop( - arrow::RecordBatchReader::Make(std::move(batches), reader->schema())); + auto result = RunWithCapturedRIfPossible>( + [&]() -> arrow::Result> { + std::vector> batches; + arrow::Result> this_batch; + + while (num_rows > 0) { + this_batch = reader->Next(); + ARROW_RETURN_NOT_OK(this_batch); + if (this_batch.ValueUnsafe() == nullptr) break; + batches.push_back(this_batch.ValueUnsafe()->Slice(0, num_rows)); + num_rows -= this_batch.ValueUnsafe()->num_rows(); + } + + return arrow::RecordBatchReader::Make(std::move(batches), reader->schema()); + }); + + return ValueOrStop(result); } // -------- RecordBatchStreamReader diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 5821c0fa2df..f43f588760f 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -282,7 +282,7 @@ test_that("user-defined functions work during multi-threaded execution", { expect_identical(result2$fun_result, example_df$value * 32) }) -test_that("user-defined error when called from an unsupported context", { +test_that("nested exec plans can contain user-defined functions", { skip_if_not_available("dataset") skip_if_not(CanRunWithCapturedR()) @@ -313,24 +313,13 @@ test_that("user-defined error when called from an unsupported context", { dplyr::collect() } - if (identical(tolower(Sys.info()[["sysname"]]), "windows")) { - expect_equal( - stream_plan_with_udf(), - record_batch(a = 1:1000) %>% - dplyr::mutate(b = times_32(a)) %>% - dplyr::collect(as_data_frame = FALSE) - ) - - result <- collect_plan_with_head() - expect_equal(nrow(result), 11) - } else { - expect_error( - stream_plan_with_udf(), - "Call to R \\(.*?\\) from a non-R thread from an unsupported context" - ) - expect_error( - collect_plan_with_head(), - "Call to R \\(.*?\\) from a non-R thread from an unsupported context" - ) - } + expect_equal( + stream_plan_with_udf(), + record_batch(a = 1:1000) %>% + dplyr::mutate(b = times_32(a)) %>% + dplyr::collect(as_data_frame = FALSE) + ) + + result <- collect_plan_with_head() + expect_equal(nrow(result), 11) }) From 095b3f0bbed31113b1d1663da3ec7ab33145fa90 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 25 Jul 2022 14:34:37 -0300 Subject: [PATCH 03/34] undo some kludges introduced --- r/R/arrowExports.R | 4 ---- r/R/query-engine.R | 37 +++++++++---------------------------- r/R/table.R | 9 +-------- r/src/arrowExports.cpp | 10 ---------- r/src/compute-exec.cpp | 5 ----- r/src/recordbatchreader.cpp | 8 +++++++- 6 files changed, 17 insertions(+), 56 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 0c756a8e66c..dbb846bdb20 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -412,10 +412,6 @@ ExecPlan_read_table <- function(plan, final_node, sort_options, metadata, head) .Call(`_arrow_ExecPlan_read_table`, plan, final_node, sort_options, metadata, head) } -ExecPlan_StopProducing <- function(plan) { - invisible(.Call(`_arrow_ExecPlan_StopProducing`, plan)) -} - ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 2d2249aba61..db4c1ba0012 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -194,13 +194,11 @@ ExecPlan <- R6Class("ExecPlan", } node }, - Run = function(node, as_table = FALSE) { - # a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync - # Start of chunk used in `BuildAndShow()` + Run = function(node) { assert_is(node, "ExecNode") # Sorting and head/tail (if sorted) are handled in the SinkNode, - # created in ExecPlan_run + # created in ExecPlan_build sorting <- node$extras$sort %||% list() select_k <- node$extras$head %||% -1L has_sorting <- length(sorting) > 0 @@ -214,16 +212,7 @@ ExecPlan <- R6Class("ExecPlan", sorting$orders <- as.integer(sorting$orders) } - # End of chunk used in `BuildAndShow()` - - # If we are going to return a Table anyway, we do this in one step and - # entirely in one C++ call to ensure that we can execute user-defined - # functions from the worker threads spawned by the ExecPlan. If not, we - # use ExecPlan_run which returns a RecordBatchReader that can be - # manipulated in R code (but that right now won't work with - # user-defined functions). - exec_fun <- if (as_table) ExecPlan_read_table else ExecPlan_run - out <- exec_fun( + out <- ExecPlan_run( self, node, sorting, @@ -239,19 +228,16 @@ ExecPlan <- R6Class("ExecPlan", # TODO(ARROW-16628): handle limit in ExecNode slice_size <- node$extras$head %||% node$extras$tail if (!is.null(slice_size)) { - out <- head(out, slice_size) - # We already have everything we need for the head, so StopProducing - self$Stop() + out_head <- head(out, slice_size) + + out <- out_head } } else if (!is.null(node$extras$tail)) { # TODO(ARROW-16630): proper BottomK support # Reverse the row order to get back what we expect out <- as_arrow_table(out) out <- out[rev(seq_len(nrow(out))), , drop = FALSE] - # Put back into RBR - if (!as_table) { - out <- as_record_batch_reader(out) - } + out <- as_record_batch_reader(out) } # If arrange() created $temp_columns, make sure to omit them from the result @@ -261,11 +247,7 @@ ExecPlan <- R6Class("ExecPlan", if (length(node$extras$sort$temp_columns) > 0) { tab <- as_arrow_table(out) tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE] - if (!as_table) { - out <- as_record_batch_reader(tab) - } else { - out <- tab - } + out <- as_record_batch_reader(tab) } out @@ -312,8 +294,7 @@ ExecPlan <- R6Class("ExecPlan", character(), select_k ) - }, - Stop = function() ExecPlan_StopProducing(self) + } ) ) # nolint end. diff --git a/r/R/table.R b/r/R/table.R index d7e276415c5..1a5c96ef50a 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -331,12 +331,5 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) { # See query-engine.R for ExecPlan/Nodes plan <- ExecPlan$create() final_node <- plan$Build(x) - - run_with_event_loop <- identical( - Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""), - "true" - ) - - result <- plan$Run(final_node, as_table = run_with_event_loop) - as_arrow_table(result) + as_arrow_table(plan$Run(final_node)) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b1df45293ff..991b7a22e08 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -893,15 +893,6 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -void ExecPlan_StopProducing(const std::shared_ptr& plan); -extern "C" SEXP _arrow_ExecPlan_StopProducing(SEXP plan_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type plan(plan_sexp); - ExecPlan_StopProducing(plan); - return R_NilValue; -END_CPP11 -} -// compute-exec.cpp std::shared_ptr ExecNode_output_schema(const std::shared_ptr& node); extern "C" SEXP _arrow_ExecNode_output_schema(SEXP node_sexp){ BEGIN_CPP11 @@ -5289,7 +5280,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, { "_arrow_ExecPlan_read_table", (DL_FUNC) &_arrow_ExecPlan_read_table, 5}, - { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 5}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index a81ad9521ac..992e1bae6cc 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -198,11 +198,6 @@ std::shared_ptr ExecPlan_read_table( return ValueOrStop(result); } -// [[arrow::export]] -void ExecPlan_StopProducing(const std::shared_ptr& plan) { - plan->StopProducing(); -} - // [[arrow::export]] std::shared_ptr ExecNode_output_schema( const std::shared_ptr& node) { diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 1d7a7299f8d..78535769ef2 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -38,7 +38,11 @@ std::shared_ptr RecordBatchReader__ReadNext( // [[arrow::export]] cpp11::list RecordBatchReader__batches( const std::shared_ptr& reader) { - return arrow::r::to_r_list(ValueOrStop(reader->ToRecordBatches())); + auto result = RunWithCapturedRIfPossible( + [&]() { return reader->ToRecordBatches(); }); + + arrow::StopIfNotOk(reader->Close()); + return arrow::r::to_r_list(ValueOrStop(result)); } // [[arrow::export]] @@ -111,6 +115,7 @@ std::shared_ptr Table__from_RecordBatchReader( auto result = RunWithCapturedRIfPossible>( [&]() { return reader->ToTable(); }); + arrow::StopIfNotOk(reader->Close()); return ValueOrStop(result); } @@ -130,6 +135,7 @@ std::shared_ptr RecordBatchReader__Head( num_rows -= this_batch.ValueUnsafe()->num_rows(); } + ARROW_RETURN_NOT_OK(reader->Close()); return arrow::RecordBatchReader::Make(std::move(batches), reader->schema()); }); From 4a32870cda729b0d71e9ac3fce757f99ec9b55be Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 25 Jul 2022 15:04:43 -0300 Subject: [PATCH 04/34] explicitly close readers where possible --- r/R/arrowExports.R | 4 ++++ r/R/query-engine.R | 2 +- r/R/record-batch-reader.R | 1 + r/src/arrowExports.cpp | 10 ++++++++++ r/src/recordbatchreader.cpp | 5 +++++ 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index dbb846bdb20..8151e565c09 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1724,6 +1724,10 @@ RecordBatchReader__schema <- function(reader) { .Call(`_arrow_RecordBatchReader__schema`, reader) } +RecordBatchReader__Close <- function(reader) { + invisible(.Call(`_arrow_RecordBatchReader__Close`, reader)) +} + RecordBatchReader__ReadNext <- function(reader) { .Call(`_arrow_RecordBatchReader__ReadNext`, reader) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index db4c1ba0012..2bcb69c26f5 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -229,7 +229,7 @@ ExecPlan <- R6Class("ExecPlan", slice_size <- node$extras$head %||% node$extras$tail if (!is.null(slice_size)) { out_head <- head(out, slice_size) - + out$Close() out <- out_head } } else if (!is.null(node$extras$tail)) { diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 3a985d8abce..e1dd52ed715 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -98,6 +98,7 @@ RecordBatchReader <- R6Class("RecordBatchReader", read_next_batch = function() RecordBatchReader__ReadNext(self), batches = function() RecordBatchReader__batches(self), read_table = function() Table__from_RecordBatchReader(self), + Close = function() RecordBatchReader__Close(self), export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr), ToString = function() self$schema$ToString() ), diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 991b7a22e08..448a500e7df 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -4458,6 +4458,15 @@ BEGIN_CPP11 END_CPP11 } // recordbatchreader.cpp +void RecordBatchReader__Close(const std::shared_ptr& reader); +extern "C" SEXP _arrow_RecordBatchReader__Close(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + RecordBatchReader__Close(reader); + return R_NilValue; +END_CPP11 +} +// recordbatchreader.cpp std::shared_ptr RecordBatchReader__ReadNext(const std::shared_ptr& reader); extern "C" SEXP _arrow_RecordBatchReader__ReadNext(SEXP reader_sexp){ BEGIN_CPP11 @@ -5608,6 +5617,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_RecordBatch__from_arrays", (DL_FUNC) &_arrow_RecordBatch__from_arrays, 2}, { "_arrow_RecordBatch__ReferencedBufferSize", (DL_FUNC) &_arrow_RecordBatch__ReferencedBufferSize, 1}, { "_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, + { "_arrow_RecordBatchReader__Close", (DL_FUNC) &_arrow_RecordBatchReader__Close, 1}, { "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, { "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1}, { "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2}, diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 78535769ef2..87805204ae2 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -27,6 +27,11 @@ std::shared_ptr RecordBatchReader__schema( return reader->schema(); } +// [[arrow::export]] +void RecordBatchReader__Close(const std::shared_ptr& reader) { + return arrow::StopIfNotOk(reader->Close()); +} + // [[arrow::export]] std::shared_ptr RecordBatchReader__ReadNext( const std::shared_ptr& reader) { From 24bc1cdb1304c40aa13213e6b9bb526f6c6c651e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 25 Jul 2022 15:05:47 -0300 Subject: [PATCH 05/34] just kidding don't do that --- r/R/query-engine.R | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 2bcb69c26f5..bc07e4fca61 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -228,9 +228,7 @@ ExecPlan <- R6Class("ExecPlan", # TODO(ARROW-16628): handle limit in ExecNode slice_size <- node$extras$head %||% node$extras$tail if (!is.null(slice_size)) { - out_head <- head(out, slice_size) - out$Close() - out <- out_head + out <- head(out, slice_size) } } else if (!is.null(node$extras$tail)) { # TODO(ARROW-16630): proper BottomK support From 154282938941dee2d6d18a5be7b0bad0d7f1cae9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 11 Aug 2022 16:08:45 -0300 Subject: [PATCH 06/34] just a shot in the dark --- r/src/compute-exec.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 992e1bae6cc..a284ce9928f 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -119,7 +119,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { } void StopProducing() { - if (status_ == 1) { + if (status_ >= 1) { bool not_finished_yet = plan_->finished().TryAddCallback([] { return [](const arrow::Status&) {}; }); From 9e9170fd6e7084d282dcf2fd5e9225519946f382 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 11 Aug 2022 16:38:16 -0300 Subject: [PATCH 07/34] try a new strategy for stopproducing --- r/src/compute-exec.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index a284ce9928f..fa8cdf0fdb7 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -120,8 +120,9 @@ class ExecPlanReader : public arrow::RecordBatchReader { void StopProducing() { if (status_ >= 1) { - bool not_finished_yet = - plan_->finished().TryAddCallback([] { return [](const arrow::Status&) {}; }); + std::shared_ptr plan(plan_); + bool not_finished_yet = plan_->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); if (not_finished_yet) { plan_->StopProducing(); From 30a8684be455f3406abf5e891fa47a705c0fc2a7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 15 Aug 2022 09:40:21 -0300 Subject: [PATCH 08/34] try a more raw approach to stoppinh production --- r/src/compute-exec.cpp | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index fa8cdf0fdb7..5a50151f50b 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -94,18 +94,12 @@ class ExecPlanReader : public arrow::RecordBatchReader { } arrow::Status Close() { - if (status_ == 2) { - return arrow::Status::Invalid("ExecPlanReader has been closed"); - } - StopProducing(); return arrow::Status::OK(); } const std::shared_ptr& Plan() { return plan_; } - ~ExecPlanReader() { StopProducing(); } - private: std::shared_ptr schema_; std::shared_ptr plan_; @@ -119,15 +113,8 @@ class ExecPlanReader : public arrow::RecordBatchReader { } void StopProducing() { - if (status_ >= 1) { - std::shared_ptr plan(plan_); - bool not_finished_yet = plan_->finished().TryAddCallback( - [&plan] { return [plan](const arrow::Status&) {}; }); - - if (not_finished_yet) { - plan_->StopProducing(); - } - + if (status_ == 1 && !plan_->finished().is_finished()) { + plan_->StopProducing(); status_ = 2; } } From 8225dc4004ba83850182a3129997832a7d1a8485 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 15 Aug 2022 09:44:22 -0300 Subject: [PATCH 09/34] always set status --- r/src/compute-exec.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 5a50151f50b..1ef6e9da02b 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -115,8 +115,9 @@ class ExecPlanReader : public arrow::RecordBatchReader { void StopProducing() { if (status_ == 1 && !plan_->finished().is_finished()) { plan_->StopProducing(); - status_ = 2; } + + status_ = 2; } }; From d69e365a0b34b33f60629c1aa455cb654357cd8a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 15 Aug 2022 12:45:31 -0300 Subject: [PATCH 10/34] another approach to stopping production --- r/src/compute-exec.cpp | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 1ef6e9da02b..9e33efa75aa 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -93,10 +93,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { return arrow::Status::OK(); } - arrow::Status Close() { - StopProducing(); - return arrow::Status::OK(); - } + arrow::Status Close() { return arrow::Status::OK(); } const std::shared_ptr& Plan() { return plan_; } @@ -107,18 +104,22 @@ class ExecPlanReader : public arrow::RecordBatchReader { int status_; arrow::Status StartProducing() { + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr plan(plan_); + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; ARROW_RETURN_NOT_OK(plan_->StartProducing()); status_ = 1; return arrow::Status::OK(); } - - void StopProducing() { - if (status_ == 1 && !plan_->finished().is_finished()) { - plan_->StopProducing(); - } - - status_ = 2; - } }; std::shared_ptr ExecPlan_prepare( From e860650138197dc116a41b1a17c6260552265398 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 22 Aug 2022 10:37:00 -0300 Subject: [PATCH 11/34] more sane but still deadlocking stop_producing --- r/src/compute-exec.cpp | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 9e33efa75aa..58b7383fbbe 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -93,7 +93,11 @@ class ExecPlanReader : public arrow::RecordBatchReader { return arrow::Status::OK(); } - arrow::Status Close() { return arrow::Status::OK(); } + arrow::Status Close() { + // plan_.reset(); + // status_ = 2; + return arrow::Status::OK(); + } const std::shared_ptr& Plan() { return plan_; } @@ -101,23 +105,24 @@ class ExecPlanReader : public arrow::RecordBatchReader { std::shared_ptr schema_; std::shared_ptr plan_; arrow::AsyncGenerator> sink_gen_; + std::shared_ptr stop_producing_; int status_; arrow::Status StartProducing() { - // If the generator is destroyed before being completely drained, inform plan - std::shared_ptr plan(plan_); - std::shared_ptr stop_producing{nullptr, [plan](...) { - bool not_finished_yet = - plan->finished().TryAddCallback([&plan] { - return [plan](const arrow::Status&) {}; - }); - - if (not_finished_yet) { - plan->StopProducing(); - } - }}; ARROW_RETURN_NOT_OK(plan_->StartProducing()); status_ = 1; + + // If the generator is destroyed before being completely drained, inform plan + const std::shared_ptr plan(plan_); + stop_producing_ = {nullptr, [plan](...) { + bool not_finished_yet = plan->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + return arrow::Status::OK(); } }; @@ -163,7 +168,7 @@ std::shared_ptr ExecPlan_prepare( out_schema = out_schema->WithMetadata(kv); } - return std::make_shared(plan, out_schema, std::move(sink_gen)); + return std::make_shared(plan, out_schema, sink_gen); } // [[arrow::export]] From 7a545848c14b0bbc217ee866841c0b93463de0d6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 22 Aug 2022 10:43:13 -0300 Subject: [PATCH 12/34] uncomment --- r/src/compute-exec.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 58b7383fbbe..ec61bc3fe28 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -94,8 +94,8 @@ class ExecPlanReader : public arrow::RecordBatchReader { } arrow::Status Close() { - // plan_.reset(); - // status_ = 2; + plan_.reset(); + status_ = 2; return arrow::Status::OK(); } From 7093723de189034c79c419151a62e6ae4690a9a1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 23 Aug 2022 10:58:46 -0300 Subject: [PATCH 13/34] go back to the other approach --- r/src/compute-exec.cpp | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index ec61bc3fe28..18f4a6c53ec 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -88,42 +88,47 @@ class ExecPlanReader : public arrow::RecordBatchReader { *batch_out = batch_result.ValueUnsafe(); } else { batch_out->reset(); + StopProducing(); } return arrow::Status::OK(); } arrow::Status Close() { - plan_.reset(); - status_ = 2; + StopProducing(); return arrow::Status::OK(); } const std::shared_ptr& Plan() { return plan_; } + ~ExecPlanReader() { StopProducing(); } + private: std::shared_ptr schema_; std::shared_ptr plan_; arrow::AsyncGenerator> sink_gen_; - std::shared_ptr stop_producing_; int status_; arrow::Status StartProducing() { ARROW_RETURN_NOT_OK(plan_->StartProducing()); status_ = 1; + return arrow::Status::OK(); + } - // If the generator is destroyed before being completely drained, inform plan - const std::shared_ptr plan(plan_); - stop_producing_ = {nullptr, [plan](...) { - bool not_finished_yet = plan->finished().TryAddCallback( - [&plan] { return [plan](const arrow::Status&) {}; }); + void StopProducing() { + if (status_ == 1) { + std::shared_ptr plan(plan_); + bool not_finished_yet = plan_->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); - if (not_finished_yet) { - plan->StopProducing(); - } - }}; + if (not_finished_yet) { + plan_->StopProducing(); + } - return arrow::Status::OK(); + status_ = 2; + plan_.reset(); + sink_gen_ = arrow::MakeEmptyGenerator>(); + } } }; From 27143d601fc527cdb98251be8b2aecccc8e639c9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 25 Aug 2022 09:41:00 -0300 Subject: [PATCH 14/34] remove unnecessary Close() --- r/src/recordbatchreader.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 87805204ae2..21c506af4ab 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -45,8 +45,6 @@ cpp11::list RecordBatchReader__batches( const std::shared_ptr& reader) { auto result = RunWithCapturedRIfPossible( [&]() { return reader->ToRecordBatches(); }); - - arrow::StopIfNotOk(reader->Close()); return arrow::r::to_r_list(ValueOrStop(result)); } @@ -120,7 +118,6 @@ std::shared_ptr Table__from_RecordBatchReader( auto result = RunWithCapturedRIfPossible>( [&]() { return reader->ToTable(); }); - arrow::StopIfNotOk(reader->Close()); return ValueOrStop(result); } From afe61cd3d99f5c50a31d71ca3556c8e759a95b6d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 25 Aug 2022 09:50:13 -0300 Subject: [PATCH 15/34] maybe fix python test --- r/R/python.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/r/R/python.R b/r/R/python.R index 023d914f16a..6d496062bc7 100644 --- a/r/R/python.R +++ b/r/R/python.R @@ -125,7 +125,8 @@ py_to_r.pyarrow.lib.Table <- function(x, ...) { ) r_rbr <- maybe_py_to_r(py_rbr) - r_rbr$read_table() + thread_safe_rbr <- map_batches(r_rbr, identity, .schema = r_rbr$schema) + thread_safe_rbr$read_table() } py_to_r.pyarrow.lib.Schema <- function(x, ...) { From bcd17bcfd56b38530c74768c3f83dbb5da36a9f6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 12:00:24 -0300 Subject: [PATCH 16/34] with ExecPlanReader R6 --- r/R/arrowExports.R | 20 +++++--- r/R/dplyr.R | 9 ++-- r/R/query-engine.R | 51 +++++++------------- r/src/arrowExports.cpp | 54 ++++++++++++--------- r/src/arrow_types.h | 2 + r/src/compute-exec.cpp | 93 ++++++++++++++++++++++--------------- r/src/recordbatchreader.cpp | 9 +--- 7 files changed, 129 insertions(+), 109 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 8151e565c09..72ef7c8eca9 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -404,22 +404,30 @@ ExecPlan_create <- function(use_threads) { .Call(`_arrow_ExecPlan_create`, use_threads) } +ExecPlanReader__batches <- function(reader) { + .Call(`_arrow_ExecPlanReader__batches`, reader) +} + +Table__from_ExecPlanReader <- function(reader) { + .Call(`_arrow_Table__from_ExecPlanReader`, reader) +} + +ExecPlanReader__Plan <- function(reader) { + .Call(`_arrow_ExecPlanReader__Plan`, reader) +} + ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) { .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head) } -ExecPlan_read_table <- function(plan, final_node, sort_options, metadata, head) { - .Call(`_arrow_ExecPlan_read_table`, plan, final_node, sort_options, metadata, head) +ExecPlan_ToString <- function(plan) { + .Call(`_arrow_ExecPlan_ToString`, plan) } ExecNode_output_schema <- function(node) { .Call(`_arrow_ExecNode_output_schema`, node) } -ExecPlan_BuildAndShow <- function(plan, final_node, sort_options, metadata, head) { - .Call(`_arrow_ExecPlan_BuildAndShow`, plan, final_node, sort_options, metadata, head) -} - ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index dffe269199c..86132d8ae4a 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -266,7 +266,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) { #' show_exec_plan() show_exec_plan <- function(x) { adq <- as_adq(x) - plan <- ExecPlan$create() + # do not show the plan if we have a nested query (as this will force the # evaluation of the inner query/queries) # TODO see if we can remove after ARROW-16628 @@ -274,8 +274,11 @@ show_exec_plan <- function(x) { warn("The `ExecPlan` cannot be printed for a nested query.") return(invisible(x)) } - final_node <- plan$Build(adq) - cat(plan$BuildAndShow(final_node)) + + result <- as_record_batch_reader(adq) + cat(result$Plan()$ToString()) + result$Close() + invisible(x) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index bc07e4fca61..300f011398a 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -259,39 +259,8 @@ ExecPlan <- R6Class("ExecPlan", ... ) }, - # SinkNodes (involved in arrange and/or head/tail operations) are created in - # ExecPlan_run and are not captured by the regulat print method. We take a - # similar approach to expose them before calling the print method. - BuildAndShow = function(node) { - # a section of this code is copied from `Run()` - the 2 need to be in sync - # Start of chunk copied from `Run()` - - assert_is(node, "ExecNode") - - # Sorting and head/tail (if sorted) are handled in the SinkNode, - # created in ExecPlan_run - sorting <- node$extras$sort %||% list() - select_k <- node$extras$head %||% -1L - has_sorting <- length(sorting) > 0 - if (has_sorting) { - if (!is.null(node$extras$tail)) { - # Reverse the sort order and take the top K, then after we'll reverse - # the resulting rows so that it is ordered as expected - sorting$orders <- !sorting$orders - select_k <- node$extras$tail - } - sorting$orders <- as.integer(sorting$orders) - } - - # End of chunk copied from `Run()` - - ExecPlan_BuildAndShow( - self, - node, - sorting, - character(), - select_k - ) + ToString = function() { + ExecPlan_ToString(self) } ) ) @@ -376,6 +345,22 @@ ExecNode <- R6Class("ExecNode", ) ) +ExecPlanReader <- R6Class("ExecPlanReader", + inherit = RecordBatchReader, + public = list( + batches = function() ExecPlanReader__batches(self), + read_table = function() Table__from_ExecPlanReader(self), + Plan = function() ExecPlanReader__Plan(self), + ToString = function() { + paste( + super$ToString(), + ExecPlanReader__ToString(self), + collapse = "\n" + ) + } + ) +) + do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 448a500e7df..3b810e5b779 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -869,7 +869,31 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); +cpp11::list ExecPlanReader__batches(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__batches(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__batches(reader)); +END_CPP11 +} +// compute-exec.cpp +std::shared_ptr Table__from_ExecPlanReader(const std::shared_ptr& reader); +extern "C" SEXP _arrow_Table__from_ExecPlanReader(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(Table__from_ExecPlanReader(reader)); +END_CPP11 +} +// compute-exec.cpp +std::shared_ptr ExecPlanReader__Plan(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__Plan(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__Plan(reader)); +END_CPP11 +} +// compute-exec.cpp +std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); @@ -881,15 +905,11 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::shared_ptr ExecPlan_read_table(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); -extern "C" SEXP _arrow_ExecPlan_read_table(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ +std::string ExecPlan_ToString(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_ToString(SEXP plan_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); - arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type sort_options(sort_options_sexp); - arrow::r::Input::type metadata(metadata_sexp); - arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_read_table(plan, final_node, sort_options, metadata, head)); + return cpp11::as_sexp(ExecPlan_ToString(plan)); END_CPP11 } // compute-exec.cpp @@ -901,18 +921,6 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); -extern "C" SEXP _arrow_ExecPlan_BuildAndShow(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type plan(plan_sexp); - arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type sort_options(sort_options_sexp); - arrow::r::Input::type metadata(metadata_sexp); - arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_BuildAndShow(plan, final_node, sort_options, metadata, head)); -END_CPP11 -} -// compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ @@ -5287,10 +5295,12 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, { "_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, + { "_arrow_ExecPlanReader__batches", (DL_FUNC) &_arrow_ExecPlanReader__batches, 1}, + { "_arrow_Table__from_ExecPlanReader", (DL_FUNC) &_arrow_Table__from_ExecPlanReader, 1}, + { "_arrow_ExecPlanReader__Plan", (DL_FUNC) &_arrow_ExecPlanReader__Plan, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, - { "_arrow_ExecPlan_read_table", (DL_FUNC) &_arrow_ExecPlan_read_table, 5}, + { "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, - { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 5}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index d9fee37e7f1..dd0dc24449e 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -58,6 +58,8 @@ class ExecNode; } // namespace compute } // namespace arrow +class ExecPlanReader; + #if defined(ARROW_R_WITH_PARQUET) #include #endif diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 18f4a6c53ec..9b0c5938d81 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -58,11 +58,26 @@ std::shared_ptr MakeExecNodeOrStop( class ExecPlanReader : public arrow::RecordBatchReader { public: + enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED }; + ExecPlanReader( const std::shared_ptr& plan, const std::shared_ptr& schema, arrow::AsyncGenerator> sink_gen) - : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(0) {} + : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} + + std::string PlanStatus() { + switch (status_) { + case PLAN_NOT_STARTED: + return "PLAN_NOT_STARTED"; + case PLAN_RUNNING: + return "PLAN_RUNNING"; + case PLAN_FINISHED: + return "PLAN_FINISHED"; + default: + return "UNKNOWN"; + } + } std::shared_ptr schema() const { return schema_; } @@ -71,12 +86,12 @@ class ExecPlanReader : public arrow::RecordBatchReader { // If this is the first batch getting pulled, tell the exec plan to // start producing - if (status_ == 0) { + if (status_ == PLAN_NOT_STARTED) { ARROW_RETURN_NOT_OK(StartProducing()); } // If we've closed the reader, this is invalid - if (status_ == 2) { + if (status_ == PLAN_FINISHED) { return arrow::Status::Invalid("ExecPlanReader has been closed"); } @@ -111,12 +126,12 @@ class ExecPlanReader : public arrow::RecordBatchReader { arrow::Status StartProducing() { ARROW_RETURN_NOT_OK(plan_->StartProducing()); - status_ = 1; + status_ = PLAN_RUNNING; return arrow::Status::OK(); } void StopProducing() { - if (status_ == 1) { + if (status_ == PLAN_RUNNING) { std::shared_ptr plan(plan_); bool not_finished_yet = plan_->finished().TryAddCallback( [&plan] { return [plan](const arrow::Status&) {}; }); @@ -124,15 +139,43 @@ class ExecPlanReader : public arrow::RecordBatchReader { if (not_finished_yet) { plan_->StopProducing(); } - - status_ = 2; - plan_.reset(); - sink_gen_ = arrow::MakeEmptyGenerator>(); } + + status_ = PLAN_FINISHED; + plan_.reset(); + sink_gen_ = arrow::MakeEmptyGenerator>(); } }; -std::shared_ptr ExecPlan_prepare( +// [[arrow::export]] +cpp11::list ExecPlanReader__batches( + const std::shared_ptr& reader) { + auto result = RunWithCapturedRIfPossible( + [&]() { return reader->ToRecordBatches(); }); + return arrow::r::to_r_list(ValueOrStop(result)); +} + +// [[arrow::export]] +std::shared_ptr Table__from_ExecPlanReader( + const std::shared_ptr& reader) { + auto result = RunWithCapturedRIfPossible>( + [&]() { return reader->ToTable(); }); + + return ValueOrStop(result); +} + +// [[arrow::export]] +std::shared_ptr ExecPlanReader__Plan( + const std::shared_ptr& reader) { + if (reader->PlanStatus() == "PLAN_FINISHED") { + cpp11::stop("Can't extract ExecPlan from a finished ExecPlanReader"); + } + + return reader->Plan(); +} + +// [[arrow::export]] +std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { @@ -177,25 +220,8 @@ std::shared_ptr ExecPlan_prepare( } // [[arrow::export]] -std::shared_ptr ExecPlan_run( - const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::list sort_options, - cpp11::strings metadata, int64_t head = -1) { - return ExecPlan_prepare(plan, final_node, sort_options, metadata, head); -} - -// [[arrow::export]] -std::shared_ptr ExecPlan_read_table( - const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::list sort_options, - cpp11::strings metadata, int64_t head = -1) { - auto reader = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - auto result = RunWithCapturedRIfPossible>( - [&]() -> arrow::Result> { - return reader->ToTable(); - }); - - return ValueOrStop(result); +std::string ExecPlan_ToString(const std::shared_ptr& plan) { + return plan->ToString(); } // [[arrow::export]] @@ -204,15 +230,6 @@ std::shared_ptr ExecNode_output_schema( return node->output_schema(); } -// [[arrow::export]] -std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, - const std::shared_ptr& final_node, - cpp11::list sort_options, cpp11::strings metadata, - int64_t head = -1) { - auto reader = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - return reader->Plan()->ToString(); -} - #if defined(ARROW_R_WITH_DATASET) #include diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 21c506af4ab..b6e32a7d479 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -43,9 +43,7 @@ std::shared_ptr RecordBatchReader__ReadNext( // [[arrow::export]] cpp11::list RecordBatchReader__batches( const std::shared_ptr& reader) { - auto result = RunWithCapturedRIfPossible( - [&]() { return reader->ToRecordBatches(); }); - return arrow::r::to_r_list(ValueOrStop(result)); + return arrow::r::to_r_list(ValueOrStop(reader->ToRecordBatches())); } // [[arrow::export]] @@ -115,10 +113,7 @@ std::shared_ptr RecordBatchReader__from_Table( // [[arrow::export]] std::shared_ptr Table__from_RecordBatchReader( const std::shared_ptr& reader) { - auto result = RunWithCapturedRIfPossible>( - [&]() { return reader->ToTable(); }); - - return ValueOrStop(result); + return ValueOrStop(reader->ToTable()); } // [[arrow::export]] From 2f9a827ce539007d3b424f27a6f21f25d64f7a92 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 12:16:06 -0300 Subject: [PATCH 17/34] undo python workaround --- r/R/python.R | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/r/R/python.R b/r/R/python.R index 6d496062bc7..023d914f16a 100644 --- a/r/R/python.R +++ b/r/R/python.R @@ -125,8 +125,7 @@ py_to_r.pyarrow.lib.Table <- function(x, ...) { ) r_rbr <- maybe_py_to_r(py_rbr) - thread_safe_rbr <- map_batches(r_rbr, identity, .schema = r_rbr$schema) - thread_safe_rbr$read_table() + r_rbr$read_table() } py_to_r.pyarrow.lib.Schema <- function(x, ...) { From a5d24a2fb15f8f6338ca14af88a4b48746845e17 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 13:55:50 -0300 Subject: [PATCH 18/34] with tests to enforce laziness --- r/R/arrowExports.R | 4 ++ r/R/query-engine.R | 9 ++-- r/src/arrowExports.cpp | 9 ++++ r/src/compute-exec.cpp | 17 ++++++- r/src/recordbatchreader.cpp | 75 +++++++++++++++++++++------- r/tests/testthat/test-query-engine.R | 29 +++++++++++ 6 files changed, 119 insertions(+), 24 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 72ef7c8eca9..6e76cd64687 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -416,6 +416,10 @@ ExecPlanReader__Plan <- function(reader) { .Call(`_arrow_ExecPlanReader__Plan`, reader) } +ExecPlanReader__PlanStatus <- function(reader) { + .Call(`_arrow_ExecPlanReader__PlanStatus`, reader) +} + ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) { .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 300f011398a..89a8c6a7f37 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -351,11 +351,12 @@ ExecPlanReader <- R6Class("ExecPlanReader", batches = function() ExecPlanReader__batches(self), read_table = function() Table__from_ExecPlanReader(self), Plan = function() ExecPlanReader__Plan(self), + PlanStatus = function() ExecPlanReader__PlanStatus(self), ToString = function() { - paste( - super$ToString(), - ExecPlanReader__ToString(self), - collapse = "\n" + sprintf( + "\n\n%s\n\nSee $Plan() for details.", + self$PlanStatus(), + super$ToString() ) } ) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 3b810e5b779..26ec6e3d9b1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -893,6 +893,14 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp +std::string ExecPlanReader__PlanStatus(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__PlanStatus(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__PlanStatus(reader)); +END_CPP11 +} +// compute-exec.cpp std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ BEGIN_CPP11 @@ -5298,6 +5306,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlanReader__batches", (DL_FUNC) &_arrow_ExecPlanReader__batches, 1}, { "_arrow_Table__from_ExecPlanReader", (DL_FUNC) &_arrow_Table__from_ExecPlanReader, 1}, { "_arrow_ExecPlanReader__Plan", (DL_FUNC) &_arrow_ExecPlanReader__Plan, 1}, + { "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) &_arrow_ExecPlanReader__PlanStatus, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, { "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 9b0c5938d81..360a9113843 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -96,10 +96,18 @@ class ExecPlanReader : public arrow::RecordBatchReader { } auto out = sink_gen_().result(); - ARROW_RETURN_NOT_OK(out); + if (!out.ok()) { + StopProducing(); + return out.status(); + } + if (out.ValueUnsafe()) { auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_, gc_memory_pool()); - ARROW_RETURN_NOT_OK(batch_result); + if (!batch_result.ok()) { + StopProducing(); + return batch_result.status(); + } + *batch_out = batch_result.ValueUnsafe(); } else { batch_out->reset(); @@ -174,6 +182,11 @@ std::shared_ptr ExecPlanReader__Plan( return reader->Plan(); } +// [[arrow::export]] +std::string ExecPlanReader__PlanStatus(const std::shared_ptr& reader) { + return reader->PlanStatus(); +} + // [[arrow::export]] std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index b6e32a7d479..e805c047c71 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -116,27 +116,66 @@ std::shared_ptr Table__from_RecordBatchReader( return ValueOrStop(reader->ToTable()); } +class RecordBatchReaderHead : public arrow::RecordBatchReader { + public: + RecordBatchReaderHead(std::shared_ptr reader, + int64_t num_rows) + : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} + + std::shared_ptr schema() const { return schema_; } + + arrow::Status ReadNext(std::shared_ptr* batch_out) { + if (!reader_) { + // Close() has been called + batch_out = nullptr; + return arrow::Status::OK(); + } + + ARROW_RETURN_NOT_OK(reader_->ReadNext(batch_out)); + if (batch_out->get()) { + num_rows_ -= batch_out->get()->num_rows(); + if (num_rows_ < 0) { + auto smaller_batch = + batch_out->get()->Slice(0, batch_out->get()->num_rows() + num_rows_); + *batch_out = smaller_batch; + } + + if (num_rows_ <= 0) { + // We've run out of num_rows before batches + ARROW_RETURN_NOT_OK(Close()); + } + } else { + // We've run out of batches before num_rows + ARROW_RETURN_NOT_OK(Close()); + } + + return arrow::Status::OK(); + } + + arrow::Status Close() { + if (reader_) { + arrow::Status result = reader_->Close(); + reader_.reset(); + return result; + } else { + return arrow::Status::OK(); + } + } + + private: + std::shared_ptr schema_; + std::shared_ptr reader_; + int64_t num_rows_; +}; + // [[arrow::export]] std::shared_ptr RecordBatchReader__Head( const std::shared_ptr& reader, int64_t num_rows) { - auto result = RunWithCapturedRIfPossible>( - [&]() -> arrow::Result> { - std::vector> batches; - arrow::Result> this_batch; - - while (num_rows > 0) { - this_batch = reader->Next(); - ARROW_RETURN_NOT_OK(this_batch); - if (this_batch.ValueUnsafe() == nullptr) break; - batches.push_back(this_batch.ValueUnsafe()->Slice(0, num_rows)); - num_rows -= this_batch.ValueUnsafe()->num_rows(); - } - - ARROW_RETURN_NOT_OK(reader->Close()); - return arrow::RecordBatchReader::Make(std::move(batches), reader->schema()); - }); - - return ValueOrStop(result); + if (num_rows <= 0) { + return ValueOrStop(arrow::RecordBatchReader::Make({}, reader->schema())); + } else { + return std::make_shared(reader, num_rows); + } } // -------- RecordBatchStreamReader diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index dd87335f876..d67dc83f201 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -17,6 +17,35 @@ library(dplyr, warn.conflicts = FALSE) +test_that("ExecPlanReader does not start evaluating a query", { + rbr <- as_record_batch_reader( + function(x) stop("This query will error if started"), + schema = schema(a = int32()) + ) + + reader <- as_record_batch_reader(as_adq(rbr)) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + expect_error(reader$read_table(), "This query will error if started") + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + +test_that("ExecPlanReader evaluates head() lazily", { + reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10))) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + head_reader <- as_record_batch_reader(head(as_adq(reader), 4)) + expect_identical(head_reader$PlanStatus(), "PLAN_NOT_STARTED") + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + expect_equal( + head_reader$read_table(), + arrow_table(a = 1:4) + ) + + expect_identical(head_reader$PlanStatus(), "PLAN_FINISHED") + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("substrait") From 9ee0602be31c162a1e7fb7b51b99021b9d41f9d9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 14:11:24 -0300 Subject: [PATCH 19/34] document the rbr subclasses --- r/src/compute-exec.cpp | 11 ++++++++++- r/src/recordbatchreader.cpp | 11 +++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 360a9113843..0a0aa68fc62 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -56,6 +56,15 @@ std::shared_ptr MakeExecNodeOrStop( }); } +// This class is a special RecordBatchReader that holds a reference to the +// underlying exec plan so that (1) it can request that the ExecPlan *stop* +// producing when this object is deleted and (2) it can defer requesting +// the ExecPlan to *start* producing until the first batch has been pulled. +// This allows it to be transformed (e.g., using map_batches() or head()) +// and queried (i.e., used as input to another ExecPlan), at the R level +// while maintaining the ability for the entire plan to be executed at once +// (e.g., to support user-defined functions) or never executed at all (e.g., +// to support printing a nested ExecPlan without having to execute it). class ExecPlanReader : public arrow::RecordBatchReader { public: enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED }; @@ -107,7 +116,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { StopProducing(); return batch_result.status(); } - + *batch_out = batch_result.ValueUnsafe(); } else { batch_out->reset(); diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index e805c047c71..eeb26446173 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -116,6 +116,14 @@ std::shared_ptr Table__from_RecordBatchReader( return ValueOrStop(reader->ToTable()); } +// Because the head() operation can leave a RecordBatchReader whose contents +// will never be drained, we implement a wrapper class here that takes care +// to (1) return only the requested number of rows (or fewer) and (2) Close +// and release the underlying reader as soon as possible. This is mostly +// useful for the ExecPlanReader, whose Close() method also requests +// that the ExecPlan stop producing, but may also be useful for readers +// that point to an open file and whose Close() or delete method releases +// the file. class RecordBatchReaderHead : public arrow::RecordBatchReader { public: RecordBatchReaderHead(std::shared_ptr reader, @@ -172,6 +180,9 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { std::shared_ptr RecordBatchReader__Head( const std::shared_ptr& reader, int64_t num_rows) { if (num_rows <= 0) { + // If we are never going to pull any batches from this reader, close it + // immediately. + StopIfNotOk(reader->Close()); return ValueOrStop(arrow::RecordBatchReader::Make({}, reader->schema())); } else { return std::make_shared(reader, num_rows); From a91505017b2ffda34dff7349c14130a87934b818 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 14:48:19 -0300 Subject: [PATCH 20/34] exec plan reader head() + tests --- r/NAMESPACE | 1 + r/R/compute.R | 7 ------- r/R/query-engine.R | 9 +++++++++ r/tests/testthat/test-compute.R | 26 ++++++++++++++++++++++++++ r/tests/testthat/test-query-engine.R | 20 ++++++++++++++++++-- 5 files changed, 54 insertions(+), 9 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index 49db309b8e8..a832828184d 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -95,6 +95,7 @@ S3method(dimnames,ArrowTabular) S3method(head,ArrowDatum) S3method(head,ArrowTabular) S3method(head,Dataset) +S3method(head,ExecPlanReader) S3method(head,RecordBatchReader) S3method(head,Scanner) S3method(head,arrow_dplyr_query) diff --git a/r/R/compute.R b/r/R/compute.R index 636c9146ca3..a144e7d678a 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -385,13 +385,6 @@ register_scalar_function <- function(name, fun, in_type, out_type, update_cache = TRUE ) - # User-defined functions require some special handling - # in the query engine which currently require an opt-in using - # the R_ARROW_COLLECT_WITH_UDF environment variable while this - # behaviour is stabilized. - # TODO(ARROW-17178) remove the need for this! - Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") - invisible(NULL) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 89a8c6a7f37..5e8b9594e6d 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -362,6 +362,15 @@ ExecPlanReader <- R6Class("ExecPlanReader", ) ) +# We need the head() of a ExecPlanReader to also be a ExecPlanReader +# because we need batches() and read_table() to evaluate in a way +# that supports user-defined functions. +#' @export +head.ExecPlanReader <- function(x, n = 6L, ...) { + head_reader <- NextMethod() + as_record_batch_reader(as_adq(head_reader)) +} + do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan) diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index f43f588760f..d3b891f53f7 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -323,3 +323,29 @@ test_that("nested exec plans can contain user-defined functions", { result <- collect_plan_with_head() expect_equal(nrow(result), 11) }) + +test_that("head() on exec plan containing user-defined functions", { + skip_if_not_available("dataset") + skip_if_not(CanRunWithCapturedR()) + + register_scalar_function( + "times_32", + function(context, x) x * 32.0, + int32(), + float64(), + auto_convert = TRUE + ) + on.exit({ + unregister_binding("times_32", update_cache = TRUE) + # TODO(ARROW-17178) remove the need for this! + Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") + }) + + result <- record_batch(a = 1:1000) %>% + dplyr::mutate(b = times_32(a)) %>% + as_record_batch_reader() %>% + head(11) %>% + dplyr::collect() + + expect_equal(nrow(result), 11) +}) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index d67dc83f201..1f5e6c96d7e 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -29,11 +29,11 @@ test_that("ExecPlanReader does not start evaluating a query", { expect_identical(reader$PlanStatus(), "PLAN_FINISHED") }) -test_that("ExecPlanReader evaluates head() lazily", { +test_that("ExecPlanReader evaluates nested exec plans lazily", { reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10))) expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") - head_reader <- as_record_batch_reader(head(as_adq(reader), 4)) + head_reader <- head(reader, 4) expect_identical(head_reader$PlanStatus(), "PLAN_NOT_STARTED") expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") @@ -46,6 +46,22 @@ test_that("ExecPlanReader evaluates head() lazily", { expect_identical(reader$PlanStatus(), "PLAN_FINISHED") }) +test_that("ExecPlanReader evaluates head() lazily", { + reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10))) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + head_reader <- head(reader, 4) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + expect_equal( + head_reader$read_table(), + arrow_table(a = 1:4) + ) + + expect_identical(head_reader$PlanStatus(), "PLAN_FINISHED") + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("substrait") From 40a99b259d4a5752fa101d22c70f66b949538be4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 14:52:11 -0300 Subject: [PATCH 21/34] remove collect with udf kludge --- r/tests/testthat/test-compute.R | 45 +++++---------------------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index d3b891f53f7..11c37519ae5 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -91,11 +91,7 @@ test_that("register_scalar_function() adds a compute function to the registry", int32(), float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) expect_true("times_32" %in% names(asNamespace("arrow")$.cache$functions)) expect_true("times_32" %in% list_compute_functions()) @@ -127,11 +123,7 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit({ - unregister_binding("times_32_bad_return_type_array", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32_bad_return_type_array", update_cache = TRUE)) expect_error( call_function("times_32_bad_return_type_array", Array$create(1L)), @@ -144,11 +136,7 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit({ - unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE)) expect_error( call_function("times_32_bad_return_type_scalar", Array$create(1L)), @@ -166,11 +154,7 @@ test_that("register_scalar_function() can register multiple kernels", { out_type = function(in_types) in_types[[1]], auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) expect_equal( call_function("times_32", Scalar$create(1L, int32())), @@ -189,9 +173,6 @@ test_that("register_scalar_function() can register multiple kernels", { }) test_that("register_scalar_function() errors for unsupported specifications", { - # TODO(ARROW-17178) remove the need for this! - on.exit(Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")) - expect_error( register_scalar_function( "no_kernels", @@ -256,11 +237,7 @@ test_that("user-defined functions work during multi-threaded execution", { float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) # check a regular collect() result <- open_dataset(tf_dataset) %>% @@ -293,11 +270,7 @@ test_that("nested exec plans can contain user-defined functions", { float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) stream_plan_with_udf <- function() { record_batch(a = 1:1000) %>% @@ -335,11 +308,7 @@ test_that("head() on exec plan containing user-defined functions", { float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) result <- record_batch(a = 1:1000) %>% dplyr::mutate(b = times_32(a)) %>% From 020d338236d7e538d2795e460c11c86e3ad5ea43 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 26 Aug 2022 16:47:24 -0300 Subject: [PATCH 22/34] nix the custom head method --- r/NAMESPACE | 1 - r/R/query-engine.R | 9 --------- r/tests/testthat/test-query-engine.R | 3 --- 3 files changed, 13 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index a832828184d..49db309b8e8 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -95,7 +95,6 @@ S3method(dimnames,ArrowTabular) S3method(head,ArrowDatum) S3method(head,ArrowTabular) S3method(head,Dataset) -S3method(head,ExecPlanReader) S3method(head,RecordBatchReader) S3method(head,Scanner) S3method(head,arrow_dplyr_query) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 5e8b9594e6d..89a8c6a7f37 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -362,15 +362,6 @@ ExecPlanReader <- R6Class("ExecPlanReader", ) ) -# We need the head() of a ExecPlanReader to also be a ExecPlanReader -# because we need batches() and read_table() to evaluate in a way -# that supports user-defined functions. -#' @export -head.ExecPlanReader <- function(x, n = 6L, ...) { - head_reader <- NextMethod() - as_record_batch_reader(as_adq(head_reader)) -} - do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 1f5e6c96d7e..3c956116992 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -34,7 +34,6 @@ test_that("ExecPlanReader evaluates nested exec plans lazily", { expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") head_reader <- head(reader, 4) - expect_identical(head_reader$PlanStatus(), "PLAN_NOT_STARTED") expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") expect_equal( @@ -42,7 +41,6 @@ test_that("ExecPlanReader evaluates nested exec plans lazily", { arrow_table(a = 1:4) ) - expect_identical(head_reader$PlanStatus(), "PLAN_FINISHED") expect_identical(reader$PlanStatus(), "PLAN_FINISHED") }) @@ -58,7 +56,6 @@ test_that("ExecPlanReader evaluates head() lazily", { arrow_table(a = 1:4) ) - expect_identical(head_reader$PlanStatus(), "PLAN_FINISHED") expect_identical(reader$PlanStatus(), "PLAN_FINISHED") }) From 93764f6aed29cec65c4d90afd1347a182b63acb6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 29 Aug 2022 09:09:35 -0300 Subject: [PATCH 23/34] wait for idle thread pool on valgrind job --- r/R/arrow-package.R | 8 ++++++++ r/R/arrowExports.R | 4 ++++ r/src/arrowExports.cpp | 9 +++++++++ r/src/threadpool.cpp | 14 ++++++++++++++ 4 files changed, 35 insertions(+) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 53fb0280a50..c13737840c4 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -83,6 +83,14 @@ invisible() } +.onUnload <- function(...) { + # When running valgrind we need to wait for the thread pools to finish + # running background tasks or else spurious memory leaks may be reported. + if (on_linux_dev()) { + WaitForIdleThreadPool() + } +} + configure_tzdb <- function() { # This is needed on Windows to support timezone-aware calculations if (requireNamespace("tzdb", quietly = TRUE)) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 6e76cd64687..269108188ad 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2040,6 +2040,10 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } +WaitForIdleThreadPool <- function() { + invisible(.Call(`_arrow_WaitForIdleThreadPool`)) +} + Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 26ec6e3d9b1..39ec8448fc0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5133,6 +5133,14 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// threadpool.cpp +void WaitForIdleThreadPool(); +extern "C" SEXP _arrow_WaitForIdleThreadPool(){ +BEGIN_CPP11 + WaitForIdleThreadPool(); + return R_NilValue; +END_CPP11 +} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5712,6 +5720,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, + { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 83e7a7ecfe6..9630046a487 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,3 +54,17 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } + +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + +// [[arrow::export]] +void WaitForIdleThreadPool() { + arrow::internal::GetCpuThreadPool()->WaitForIdle(); + arrow::io::internal::GetIOThreadPool()->WaitForIdle(); +} From 0b8ddfa9be788eb2458ed6040518a7bf51a5fe8c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 31 Aug 2022 10:53:10 -0300 Subject: [PATCH 24/34] remove thread pool stuff (it didn't work) --- r/R/arrow-package.R | 8 -------- r/R/arrowExports.R | 4 ---- r/src/arrowExports.cpp | 9 --------- r/src/threadpool.cpp | 14 -------------- 4 files changed, 35 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index c13737840c4..53fb0280a50 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -83,14 +83,6 @@ invisible() } -.onUnload <- function(...) { - # When running valgrind we need to wait for the thread pools to finish - # running background tasks or else spurious memory leaks may be reported. - if (on_linux_dev()) { - WaitForIdleThreadPool() - } -} - configure_tzdb <- function() { # This is needed on Windows to support timezone-aware calculations if (requireNamespace("tzdb", quietly = TRUE)) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 269108188ad..6e76cd64687 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -2040,10 +2040,6 @@ SetIOThreadPoolCapacity <- function(threads) { invisible(.Call(`_arrow_SetIOThreadPoolCapacity`, threads)) } -WaitForIdleThreadPool <- function() { - invisible(.Call(`_arrow_WaitForIdleThreadPool`)) -} - Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 39ec8448fc0..26ec6e3d9b1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5133,14 +5133,6 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } -// threadpool.cpp -void WaitForIdleThreadPool(); -extern "C" SEXP _arrow_WaitForIdleThreadPool(){ -BEGIN_CPP11 - WaitForIdleThreadPool(); - return R_NilValue; -END_CPP11 -} // type_infer.cpp std::shared_ptr Array__infer_type(SEXP x); extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ @@ -5720,7 +5712,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, - { "_arrow_WaitForIdleThreadPool", (DL_FUNC) &_arrow_WaitForIdleThreadPool, 0}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, {NULL, NULL, 0} }; diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp index 9630046a487..83e7a7ecfe6 100644 --- a/r/src/threadpool.cpp +++ b/r/src/threadpool.cpp @@ -54,17 +54,3 @@ int GetIOThreadPoolCapacity() { return arrow::io::GetIOThreadPoolCapacity(); } void SetIOThreadPoolCapacity(int threads) { StopIfNotOk(arrow::io::SetIOThreadPoolCapacity(threads)); } - -namespace arrow { -namespace io { -namespace internal { -arrow::internal::ThreadPool* GetIOThreadPool(); -} -} // namespace io -} // namespace arrow - -// [[arrow::export]] -void WaitForIdleThreadPool() { - arrow::internal::GetCpuThreadPool()->WaitForIdle(); - arrow::io::internal::GetIOThreadPool()->WaitForIdle(); -} From 05f706739f0f8e1bf6f762e3070b616900246e27 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 31 Aug 2022 10:56:58 -0300 Subject: [PATCH 25/34] simplify arrow_dplyr_query to table --- r/R/table.R | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/r/R/table.R b/r/R/table.R index 1a5c96ef50a..c5291257792 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -328,8 +328,5 @@ as_arrow_table.RecordBatchReader <- function(x, ...) { #' @rdname as_arrow_table #' @export as_arrow_table.arrow_dplyr_query <- function(x, ...) { - # See query-engine.R for ExecPlan/Nodes - plan <- ExecPlan$create() - final_node <- plan$Build(x) - as_arrow_table(plan$Run(final_node)) + as_arrow_table(as_record_batch_reader(x)) } From b6b91026729481f705ba42cfd301b1627f7bb04d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 31 Aug 2022 11:15:40 -0300 Subject: [PATCH 26/34] add test for lazy head() --- r/tests/testthat/test-query-engine.R | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 3c956116992..7e84b202c4d 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -59,6 +59,32 @@ test_that("ExecPlanReader evaluates head() lazily", { expect_identical(reader$PlanStatus(), "PLAN_FINISHED") }) +test_that("ExecPlanReader evaluates head() lazily", { + skip_if_not(CanRunWithCapturedR()) + + max_batches <- 100L + reader <- as_record_batch_reader(function() { + # Make sure the failure mode is more informative an infinite loop + max_batches <<- max_batches - 1L + if (max_batches < 0) { + stop("head() on arrow_dplyr_query was not lazy", call. = FALSE) + } + + record_batch( + line = c( + "this is the RecordBatchReader that never ends", + "yes it goes on and on my friends", + "some ExecPlan started pulling from it", + "not knowing what it was, and hopefully won't", + "continue pulling on forever just because" + ) + ) + }, schema = schema(line = string())) + + query <- head(as_adq(reader), 100) + expect_identical(as_arrow_table(query)$num_rows, 100L) +}) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("substrait") From 5ec7c1a34f80ef26d243950c6148528d776aaec2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 31 Aug 2022 11:36:54 -0300 Subject: [PATCH 27/34] don't use R function call recordbatchreader just now --- r/tests/testthat/test-query-engine.R | 43 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 7e84b202c4d..4c95e0989b1 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -60,29 +60,30 @@ test_that("ExecPlanReader evaluates head() lazily", { }) test_that("ExecPlanReader evaluates head() lazily", { - skip_if_not(CanRunWithCapturedR()) - - max_batches <- 100L - reader <- as_record_batch_reader(function() { - # Make sure the failure mode is more informative an infinite loop - max_batches <<- max_batches - 1L - if (max_batches < 0) { - stop("head() on arrow_dplyr_query was not lazy", call. = FALSE) - } - - record_batch( - line = c( - "this is the RecordBatchReader that never ends", - "yes it goes on and on my friends", - "some ExecPlan started pulling from it", - "not knowing what it was, and hopefully won't", - "continue pulling on forever just because" - ) + # make a 500-row RecordBatchReader + reader <- RecordBatchReader$create( + batches = rep( + list( + record_batch( + line = c( + "this is the RecordBatchReader that never ends", + "yes it goes on and on my friends", + "some ExecPlan started pulling from it", + "not knowing what it was, and hopefully won't", + "continue pulling on forever just because" + ) + ) + ), + 100L ) - }, schema = schema(line = string())) + ) + + # But only get 10 rows from it + query <- head(as_adq(reader), 10) + expect_identical(as_arrow_table(query)$num_rows, 10L) - query <- head(as_adq(reader), 100) - expect_identical(as_arrow_table(query)$num_rows, 100L) + # make sure there are some rows left + expect_true(reader$read_table()$num_rows > 0) }) test_that("do_exec_plan_substrait can evaluate a simple plan", { From adc61f68f01abd07263e24e367f60a71bc4faf04 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 31 Aug 2022 13:35:58 -0300 Subject: [PATCH 28/34] add a note about lazy stopping but don't test it quite yet --- r/tests/testthat/test-query-engine.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index 4c95e0989b1..dce93f72306 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -82,8 +82,9 @@ test_that("ExecPlanReader evaluates head() lazily", { query <- head(as_adq(reader), 10) expect_identical(as_arrow_table(query)$num_rows, 10L) - # make sure there are some rows left - expect_true(reader$read_table()$num_rows > 0) + # Depending on exactly how quickly background threads respond to the + # request to cancel, reader$read_table()$num_rows > 0 may or may not + # evaluate to TRUE (i.e., the reader may or may not be completely drained). }) test_that("do_exec_plan_substrait can evaluate a simple plan", { From 921296fea0578e3bac84672e6afc4bb9e4e5a997 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 12 Sep 2022 12:44:08 -0300 Subject: [PATCH 29/34] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- r/src/compute-exec.cpp | 10 +++++----- r/src/recordbatchreader.cpp | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0a0aa68fc62..f13e706b234 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -75,7 +75,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { arrow::AsyncGenerator> sink_gen) : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} - std::string PlanStatus() { + std::string PlanStatus() const { switch (status_) { case PLAN_NOT_STARTED: return "PLAN_NOT_STARTED"; @@ -88,9 +88,9 @@ class ExecPlanReader : public arrow::RecordBatchReader { } } - std::shared_ptr schema() const { return schema_; } + std::shared_ptr schema() const override { return schema_; } - arrow::Status ReadNext(std::shared_ptr* batch_out) { + arrow::Status ReadNext(std::shared_ptr* batch_out) override { // TODO(ARROW-11841) check a StopToken to potentially cancel this plan // If this is the first batch getting pulled, tell the exec plan to @@ -126,12 +126,12 @@ class ExecPlanReader : public arrow::RecordBatchReader { return arrow::Status::OK(); } - arrow::Status Close() { + arrow::Status Close() override { StopProducing(); return arrow::Status::OK(); } - const std::shared_ptr& Plan() { return plan_; } + const std::shared_ptr& Plan() const { return plan_; } ~ExecPlanReader() { StopProducing(); } diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index eeb26446173..d0c52acc416 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -130,9 +130,9 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { int64_t num_rows) : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} - std::shared_ptr schema() const { return schema_; } + std::shared_ptr schema() const override { return schema_; } - arrow::Status ReadNext(std::shared_ptr* batch_out) { + arrow::Status ReadNext(std::shared_ptr* batch_out) override { if (!reader_) { // Close() has been called batch_out = nullptr; @@ -160,7 +160,7 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { return arrow::Status::OK(); } - arrow::Status Close() { + arrow::Status Close() override { if (reader_) { arrow::Status result = reader_->Close(); reader_.reset(); From 4fece2abd7eebc264971781ab5b8275d7816deec Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 14 Sep 2022 09:10:44 -0300 Subject: [PATCH 30/34] make the test less funny and less confusing --- r/tests/testthat/test-query-engine.R | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index dce93f72306..f2190eb6684 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -60,25 +60,15 @@ test_that("ExecPlanReader evaluates head() lazily", { }) test_that("ExecPlanReader evaluates head() lazily", { - # make a 500-row RecordBatchReader + # Make a rather long RecordBatchReader reader <- RecordBatchReader$create( batches = rep( - list( - record_batch( - line = c( - "this is the RecordBatchReader that never ends", - "yes it goes on and on my friends", - "some ExecPlan started pulling from it", - "not knowing what it was, and hopefully won't", - "continue pulling on forever just because" - ) - ) - ), + list(record_batch(line = letters)), 100L ) ) - # But only get 10 rows from it + # ...But only get 10 rows from it query <- head(as_adq(reader), 10) expect_identical(as_arrow_table(query)$num_rows, 10L) From df724b40ea22904df8c808a3937f1dc998aedbc0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 14 Sep 2022 15:11:21 -0300 Subject: [PATCH 31/34] attempt to simplify the call to StopProducing() --- r/src/compute-exec.cpp | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index f13e706b234..afdf3ddc4c7 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -99,9 +99,11 @@ class ExecPlanReader : public arrow::RecordBatchReader { ARROW_RETURN_NOT_OK(StartProducing()); } - // If we've closed the reader, this is invalid + // If we've closed the reader, keep sending nullptr + // (consistent with what most RecordBatchReader subclasses do) if (status_ == PLAN_FINISHED) { - return arrow::Status::Invalid("ExecPlanReader has been closed"); + batch_out->reset(); + return arrow::Status::OK(); } auto out = sink_gen_().result(); @@ -149,13 +151,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { void StopProducing() { if (status_ == PLAN_RUNNING) { - std::shared_ptr plan(plan_); - bool not_finished_yet = plan_->finished().TryAddCallback( - [&plan] { return [plan](const arrow::Status&) {}; }); - - if (not_finished_yet) { - plan_->StopProducing(); - } + plan_->StopProducing(); } status_ = PLAN_FINISHED; From 1e6d571e080372d077e328a6b88eb9bfde45736b Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 14 Sep 2022 16:40:46 -0300 Subject: [PATCH 32/34] comment to remind myself why the complicated callback thing needs to happen --- r/src/compute-exec.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index afdf3ddc4c7..cb2eca6d5c0 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -151,7 +151,17 @@ class ExecPlanReader : public arrow::RecordBatchReader { void StopProducing() { if (status_ == PLAN_RUNNING) { - plan_->StopProducing(); + // We're done with the plan, but it may still need some time + // to finish and clean up after itself. To do this, we give a + // callable with its own copy of the shared_ptr so + // that it can delete itself when it is safe to do so. + std::shared_ptr plan(plan_); + bool not_finished_yet = plan_->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); + + if (not_finished_yet) { + plan_->StopProducing(); + } } status_ = PLAN_FINISHED; From 7d8e92f3e5d917f284e5d891c9b7a1149ca8cec9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 15 Sep 2022 09:54:28 -0300 Subject: [PATCH 33/34] update for C++17 --- r/src/compute-exec.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index cb2eca6d5c0..27758f1457a 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -72,7 +72,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { ExecPlanReader( const std::shared_ptr& plan, const std::shared_ptr& schema, - arrow::AsyncGenerator> sink_gen) + arrow::AsyncGenerator> sink_gen) : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} std::string PlanStatus() const { @@ -140,7 +140,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { private: std::shared_ptr schema_; std::shared_ptr plan_; - arrow::AsyncGenerator> sink_gen_; + arrow::AsyncGenerator> sink_gen_; int status_; arrow::Status StartProducing() { @@ -166,7 +166,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { status_ = PLAN_FINISHED; plan_.reset(); - sink_gen_ = arrow::MakeEmptyGenerator>(); + sink_gen_ = arrow::MakeEmptyGenerator>(); } }; From 92e04164b966553d0402eb35c4b718e020555470 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 15 Sep 2022 10:02:04 -0300 Subject: [PATCH 34/34] clang-format --- r/src/compute-exec.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 27758f1457a..71dc6d8b2e1 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -69,10 +69,9 @@ class ExecPlanReader : public arrow::RecordBatchReader { public: enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED }; - ExecPlanReader( - const std::shared_ptr& plan, - const std::shared_ptr& schema, - arrow::AsyncGenerator> sink_gen) + ExecPlanReader(const std::shared_ptr& plan, + const std::shared_ptr& schema, + arrow::AsyncGenerator> sink_gen) : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} std::string PlanStatus() const {