diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 080805d75f0..cb35615e365 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -30,6 +30,15 @@ #include #include +// GH-15151: Best path forward to make this available without a hack like this one +namespace arrow { +namespace io { +namespace internal { +arrow::internal::ThreadPool* GetIOThreadPool(); +} +} // namespace io +} // namespace arrow + namespace compute = ::arrow::compute; std::shared_ptr make_compute_options(std::string func_name, @@ -447,7 +456,7 @@ std::shared_ptr ExecNode_SourceNode( arrow::compute::SourceNodeOptions options{ /*output_schema=*/reader->schema(), /*generator=*/ValueOrStop( - compute::MakeReaderGenerator(reader, arrow::internal::GetCpuThreadPool()))}; + compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))}; return MakeExecNodeOrStop("source", plan.get(), {}, options); } diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 8e9df121748..9ea4d917016 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -128,12 +128,12 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { public: RecordBatchReaderHead(std::shared_ptr reader, int64_t num_rows) - : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} + : done_(false), schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} std::shared_ptr schema() const override { return schema_; } arrow::Status ReadNext(std::shared_ptr* batch_out) override { - if (!reader_) { + if (done_) { // Close() has been called batch_out = nullptr; return arrow::Status::OK(); @@ -161,16 +161,17 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader { } arrow::Status Close() override { - if (reader_) { + if (done_) { + return arrow::Status::OK(); + } else { + done_ = true; arrow::Status result = reader_->Close(); - reader_.reset(); return result; - } else { - return arrow::Status::OK(); } } private: + bool done_; std::shared_ptr schema_; std::shared_ptr reader_; int64_t num_rows_;