From f8219cb41aacc374cdfa82d77691cd6bae28a898 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 3 Nov 2022 12:47:12 -0300 Subject: [PATCH 1/3] don't delete source reader --- r/src/recordbatchreader.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 8e9df121748..9ea4d917016 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -128,12 +128,12 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { public: RecordBatchReaderHead(std::shared_ptr reader, int64_t num_rows) - : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} + : done_(false), schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} std::shared_ptr schema() const override { return schema_; } arrow::Status ReadNext(std::shared_ptr* batch_out) override { - if (!reader_) { + if (done_) { // Close() has been called batch_out = nullptr; return arrow::Status::OK(); @@ -161,16 +161,17 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { } arrow::Status Close() override { - if (reader_) { + if (done_) { + return arrow::Status::OK(); + } else { + done_ = true; arrow::Status result = reader_->Close(); - reader_.reset(); return result; - } else { - return arrow::Status::OK(); } } private: + bool done_; std::shared_ptr schema_; std::shared_ptr reader_; int64_t num_rows_; From cb2031d7bc47c19aa14ca31e3e4315f7f1bbad6b Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 16 Dec 2022 14:17:17 -0400 Subject: [PATCH 2/3] use the IO thread pool for a RecordBatchReader input to an ExecPlan --- r/src/compute-exec.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 080805d75f0..1f23516f117 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -30,6 +30,15 @@ #include #include +// Can't find the header for this? +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + namespace compute = ::arrow::compute; std::shared_ptr make_compute_options(std::string func_name, @@ -447,7 +456,7 @@ std::shared_ptr ExecNode_SourceNode( arrow::compute::SourceNodeOptions options{ /*output_schema=*/reader->schema(), /*generator=*/ValueOrStop( - compute::MakeReaderGenerator(reader, arrow::internal::GetCpuThreadPool()))}; + compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))}; return MakeExecNodeOrStop("source", plan.get(), {}, options); } From 854b156ca62c25780dbc94a4bacb230833940bff Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 2 Jan 2023 10:35:47 -0400 Subject: [PATCH 3/3] reference issue --- r/src/compute-exec.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 1f23516f117..cb35615e365 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -30,7 +30,7 @@ #include #include -// Can't find the header for this? +// GH-15151: Best path forward to make this available without a hack like this one namespace arrow { namespace io { namespace internal {