From 42e0c0d684e0b5e8d9012c5096cc562cd9bcd0f2 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 3 Jun 2021 15:24:57 +0200 Subject: [PATCH] simplify RTasks as hinted by @westonpace in https://github.com/apache/arrow/pull/9615#discussion_r644193980 --- r/src/r_to_arrow.cpp | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index d0f4f3a6def..58d783831a5 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -75,6 +75,10 @@ class RTasks { // run the delayed tasks now for (auto& task : delayed_serial_tasks_) { status &= std::move(task)(); + if (!status.ok()) { + stop_source_.RequestStop(); + break; + } } // then wait for the parallel tasks to finish @@ -86,11 +90,10 @@ class RTasks { } void Append(bool parallel, Task&& task) { - StoppingTask stopping_task(stop_source_, std::move(task)); if (parallel && use_threads_) { - parallel_tasks_->Append(std::move(stopping_task)); + parallel_tasks_->Append(std::move(task)); } else { - delayed_serial_tasks_.push_back(std::move(stopping_task)); + delayed_serial_tasks_.push_back(std::move(task)); } } @@ -108,30 +111,6 @@ class RTasks { StopSource stop_source_; std::shared_ptr parallel_tasks_; std::vector delayed_serial_tasks_; - - private: - class StoppingTask { - public: - StoppingTask(StopSource stop_source, Task&& task) : task_(std::move(task)) {} - - Status operator()() { - Status status; - StopToken token = stop_source_.token(); - if (token.IsStopRequested()) { - status &= token.Poll(); - } else { - Status status = std::move(task_)(); - if (!status.ok()) { - stop_source_.RequestStop(); - } - } - return status; - } - - private: - StopSource stop_source_; - Task task_; - }; }; struct RConversionOptions {