From 541e494dc4922620a6bc638971df1cd88c137376 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 2 Apr 2021 10:09:46 -1000 Subject: [PATCH 1/9] ARROW-12208: Added a serial executor which can run async code serially without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first. --- cpp/src/arrow/csv/reader.cc | 23 ++--- cpp/src/arrow/csv/reader.h | 4 +- cpp/src/arrow/dataset/file_base.cc | 75 +++++++------- cpp/src/arrow/dataset/file_csv.cc | 23 +++-- cpp/src/arrow/dataset/file_csv_test.cc | 24 ++--- cpp/src/arrow/dataset/scanner.cc | 31 ++++-- cpp/src/arrow/dataset/scanner.h | 5 +- cpp/src/arrow/dataset/scanner_internal.h | 15 +-- cpp/src/arrow/dataset/test_util.h | 8 +- cpp/src/arrow/util/thread_pool.cc | 57 +++++++++++ cpp/src/arrow/util/thread_pool.h | 94 ++++++++++++++++++ cpp/src/arrow/util/thread_pool_test.cc | 119 +++++++++++++++++++++++ 12 files changed, 390 insertions(+), 88 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 0e86df26ad8..c4352360e6b 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -1001,9 +1001,8 @@ Result> MakeTableReader( Future> MakeStreamingReader( io::IOContext io_context, std::shared_ptr input, - const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) { - auto cpu_executor = internal::GetCpuThreadPool(); + internal::Executor* cpu_executor, const ReadOptions& read_options, + const ParseOptions& parse_options, const ConvertOptions& convert_options) { std::shared_ptr reader; reader = std::make_shared( io_context, cpu_executor, input, read_options, parse_options, convert_options); @@ -1036,8 +1035,9 @@ Result> StreamingReader::Make( const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { auto io_context = io::IOContext(pool); - auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options, - parse_options, convert_options); + auto cpu_executor = internal::GetCpuThreadPool(); + auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor, + read_options, parse_options, convert_options); auto reader_result = reader_fut.result(); ARROW_ASSIGN_OR_RAISE(auto reader, reader_result); return reader; @@ -1047,8 +1047,9 @@ Result> StreamingReader::Make( io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { - auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options, - parse_options, convert_options); + auto cpu_executor = internal::GetCpuThreadPool(); + auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor, + read_options, parse_options, convert_options); auto reader_result = reader_fut.result(); ARROW_ASSIGN_OR_RAISE(auto reader, reader_result); return reader; @@ -1056,10 +1057,10 @@ Result> StreamingReader::Make( Future> StreamingReader::MakeAsync( io::IOContext io_context, std::shared_ptr input, - const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) { - return MakeStreamingReader(io_context, std::move(input), read_options, parse_options, - convert_options); + internal::Executor* cpu_executor, const ReadOptions& read_options, + const ParseOptions& parse_options, const ConvertOptions& convert_options) { + return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options, + parse_options, convert_options); } } // namespace csv diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h index 79015e941ee..72f1375cc3c 100644 --- a/cpp/src/arrow/csv/reader.h +++ b/cpp/src/arrow/csv/reader.h @@ -26,6 +26,7 @@ #include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/future.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/visibility.h" namespace arrow { @@ -72,7 +73,8 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader { /// parsing (see ARROW-11889) static Future> MakeAsync( io::IOContext io_context, std::shared_ptr input, - const ReadOptions&, const ParseOptions&, const ConvertOptions&); + internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&, + const ConvertOptions&); static Result> Make( io::IOContext io_context, std::shared_ptr input, diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 8437c75ae1c..be283491ade 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -418,14 +418,46 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr& scan_t return Status::OK(); } +Future<> WriteInternal(const ScanOptions& scan_options, WriteState& state, + ScanTaskVector scan_tasks, internal::Executor* cpu_executor) { + // Store a mapping from partitions (represened by their formatted partition expressions) + // to a WriteQueue which flushes batches into that partition's output file. In principle + // any thread could produce a batch for any partition, so each task alternates between + // pushing batches and flushing them to disk. + std::vector> scan_futs; + auto task_group = scan_options.TaskGroup(); + + for (const auto& scan_task : scan_tasks) { + if (scan_task->supports_async()) { + ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor)); + std::function batch)> batch_visitor = + [&, scan_task](std::shared_ptr batch) { + return WriteNextBatch(state, scan_task, std::move(batch)); + }; + scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor)); + } else { + task_group->Append([&, scan_task] { + ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); + + for (auto maybe_batch : batches) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch))); + } + + return Status::OK(); + }); + } + } + RETURN_NOT_OK(task_group->Finish()); + return AllComplete(scan_futs); +} + } // namespace Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, std::shared_ptr scanner) { RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template)); - auto task_group = scanner->options()->TaskGroup(); - // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent: // // - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing @@ -440,7 +472,6 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments()); ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector()); ScanTaskVector scan_tasks; - std::vector> scan_futs; for (const auto& fragment : fragments) { auto options = std::make_shared(*scanner->options()); @@ -454,38 +485,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio } } - // Store a mapping from partitions (represened by their formatted partition expressions) - // to a WriteQueue which flushes batches into that partition's output file. In principle - // any thread could produce a batch for any partition, so each task alternates between - // pushing batches and flushing them to disk. WriteState state(write_options); + auto res = internal::RunSynchronously( + [&](internal::Executor* cpu_executor) -> Future<> { + return WriteInternal(*scanner->options(), state, std::move(scan_tasks), + cpu_executor); + }, + scanner->options()->use_threads); + RETURN_NOT_OK(res); - for (const auto& scan_task : scan_tasks) { - if (scan_task->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync()); - std::function batch)> batch_visitor = - [&, scan_task](std::shared_ptr batch) { - return WriteNextBatch(state, scan_task, std::move(batch)); - }; - scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor)); - } else { - task_group->Append([&, scan_task] { - ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); - - for (auto maybe_batch : batches) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch))); - } - - return Status::OK(); - }); - } - } - RETURN_NOT_OK(task_group->Finish()); - auto scan_futs_all_done = AllComplete(scan_futs); - RETURN_NOT_OK(scan_futs_all_done.status()); - - task_group = scanner->options()->TaskGroup(); + auto task_group = scanner->options()->TaskGroup(); for (const auto& part_queue : state.queues) { task_group->Append([&] { return part_queue.second->writer()->Finish(); }); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b55c23dfdef..677d1be05b7 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -43,6 +43,8 @@ namespace dataset { using internal::checked_cast; using internal::checked_pointer_cast; +using internal::Executor; +using internal::SerialExecutor; using RecordBatchGenerator = AsyncGenerator>; Result> GetColumnNames( @@ -107,13 +109,14 @@ static inline Result GetReadOptions( auto read_options = csv_scan_options->read_options; // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it - // here. + // here. Also, this is a no-op since the streaming CSV reader is currently serial read_options.use_threads = false; return read_options; } static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, + internal::Executor* cpu_executor, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); @@ -136,7 +139,8 @@ static inline Future> OpenReaderAsync( } return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), - reader_options, parse_options, convert_options) + cpu_executor, reader_options, parse_options, + convert_options) .Then( [](const std::shared_ptr& maybe_reader) -> Result> { return maybe_reader; }, @@ -151,8 +155,12 @@ static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { - auto open_reader_fut = OpenReaderAsync(source, format, scan_options, pool); - return open_reader_fut.result(); + bool use_threads = (scan_options != nullptr && scan_options->use_threads); + return internal::RunSynchronously>( + [&](Executor* executor) { + return OpenReaderAsync(source, format, executor, scan_options, pool); + }, + use_threads); } /// \brief A ScanTask backed by an Csv file. @@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask { source_(fragment->source()) {} Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); + ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool())); return MakeGeneratorIterator(std::move(gen)); } bool supports_async() const override { return true; } - Result ExecuteAsync() override { - auto reader_fut = OpenReaderAsync(source_, *format_, options(), options()->pool); + Result ExecuteAsync(internal::Executor* cpu_executor) override { + auto reader_fut = + OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool); auto generator_fut = reader_fut.Then( [](const std::shared_ptr& reader) -> RecordBatchGenerator { return [reader]() { return reader->ReadNextAsync(); }; diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 99ca7cc0f42..6431e3daa64 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -227,18 +227,18 @@ TEST_P(TestCsvFileFormat, IsSupported) { ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); ASSERT_EQ(supported, false); - source = GetFileSource(R"(declare,two - 1,2,3)"); - ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); - ASSERT_EQ(supported, false); - - source = GetFileSource(R"(f64 -1.0 - -N/A -2)"); - ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); - EXPECT_EQ(supported, true); + // source = GetFileSource(R"(declare,two + // 1,2,3)"); + // ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + // ASSERT_EQ(supported, false); + + // source = GetFileSource(R"(f64 + // 1.0 + + // N/A + // 2)"); + // ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + // EXPECT_EQ(supported, true); } TEST_P(TestCsvFileFormat, NonProjectedFieldWithDifferingTypeFromInferred) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 2258a10d141..85b886c170f 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -47,6 +47,8 @@ std::vector ScanOptions::MaterializedFields() const { return fields; } +using arrow::internal::Executor; +using arrow::internal::SerialExecutor; using arrow::internal::TaskGroup; std::shared_ptr ScanOptions::TaskGroup() const { @@ -61,7 +63,7 @@ Result InMemoryScanTask::Execute() { return MakeVectorIterator(record_batches_); } -Result ScanTask::ExecuteAsync() { +Result ScanTask::ExecuteAsync(internal::Executor*) { return Status::NotImplemented("Async is not implemented for this scan task yet"); } @@ -200,6 +202,13 @@ struct TableAssemblyState { }; Result> Scanner::ToTable() { + return internal::RunSynchronously>( + [this](Executor* executor) { return ToTableInternal(executor); }, + scan_options_->use_threads); +} + +Future> Scanner::ToTableInternal( + internal::Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); auto task_group = scan_options_->TaskGroup(); @@ -215,7 +224,7 @@ Result> Scanner::ToTable() { auto id = scan_task_id++; if (scan_task->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync()); + ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync(cpu_executor)); auto scan_fut = CollectAsyncGenerator(std::move(scan_gen)) .Then([state, id](const RecordBatchVector& rbs) { state->Emplace(rbs, id); @@ -230,14 +239,18 @@ Result> Scanner::ToTable() { }); } } + auto scan_options = scan_options_; // Wait for all async tasks to complete, or the first error - RETURN_NOT_OK(AllComplete(scan_futures).status()); - - // Wait for all sync tasks to complete, or the first error. - RETURN_NOT_OK(task_group->Finish()); - - return Table::FromRecordBatches(scan_options_->projected_schema, - FlattenRecordBatchVector(std::move(state->batches))); + return AllComplete(scan_futures) + .Then([task_group, scan_options, + state](const detail::Empty&) -> Result> { + // Wait for all sync tasks to complete, or the first error. + RETURN_NOT_OK(task_group->Finish()); + + return Table::FromRecordBatches( + scan_options->projected_schema, + FlattenRecordBatchVector(std::move(state->batches))); + }); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index c3cce00d8c5..6a6e72c8406 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -32,6 +32,7 @@ #include "arrow/memory_pool.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/type_fwd.h" namespace arrow { @@ -103,7 +104,7 @@ class ARROW_DS_EXPORT ScanTask { /// resulting from the Scan. Execution semantics are encapsulated in the /// particular ScanTask implementation virtual Result Execute() = 0; - virtual Result ExecuteAsync(); + virtual Result ExecuteAsync(internal::Executor* cpu_executor); virtual bool supports_async() const; virtual ~ScanTask() = default; @@ -175,6 +176,8 @@ class ARROW_DS_EXPORT Scanner { const std::shared_ptr& options() const { return scan_options_; } protected: + Future> ToTableInternal(internal::Executor* cpu_executor); + std::shared_ptr dataset_; // TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments std::shared_ptr fragment_; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 3101be477fd..04145188cb2 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -29,10 +29,12 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" namespace arrow { using internal::checked_cast; +using internal::Executor; namespace dataset { @@ -171,22 +173,15 @@ class FilterAndProjectScanTask : public ScanTask { options_->pool); } - Result Execute() override { - if (task_->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); - return MakeGeneratorIterator(std::move(gen)); - } else { - return ExecuteSync(); - } - } + Result Execute() override { return ExecuteSync(); } - Result ExecuteAsync() override { + Result ExecuteAsync(Executor* cpu_executor) override { if (!task_->supports_async()) { return Status::Invalid( "ExecuteAsync should not have been called on FilterAndProjectScanTask if the " "source task did not support async"); } - ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync()); + ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync(cpu_executor)); ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(options()->filter, partition_)); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 86bb14b038d..a6e761cf8c5 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -813,15 +813,15 @@ class NestedParallelismMixin : public ::testing::Test { // supports_async() to false (will deadlock) ADD_FAILURE() << "NestedParallelismScanTask::Execute should never be called. You " "should be deadlocked right now"; - ARROW_ASSIGN_OR_RAISE(auto batch_gen, ExecuteAsync()); + ARROW_ASSIGN_OR_RAISE(auto batch_gen, ExecuteAsync(internal::GetCpuThreadPool())); return MakeGeneratorIterator(std::move(batch_gen)); } - Result ExecuteAsync() override { + Result ExecuteAsync(internal::Executor* cpu_executor) override { ARROW_ASSIGN_OR_RAISE(auto batches_it, target_->Execute()); ARROW_ASSIGN_OR_RAISE(auto batches, batches_it.ToVector()); - auto generator_fut = DeferNotOk(internal::GetCpuThreadPool()->Submit( - [batches] { return MakeVectorGenerator(batches); })); + auto generator_fut = DeferNotOk( + cpu_executor->Submit([batches] { return MakeVectorGenerator(batches); })); return MakeFromFuture(generator_fut); } diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index f2a8368d273..bc737b7e7c4 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -44,6 +44,63 @@ struct Task { } // namespace +struct SerialExecutor::State { + std::queue task_queue; + std::mutex mutex; + std::condition_variable wait_for_tasks; +}; + +SerialExecutor::SerialExecutor() : state_(new State()) {} +SerialExecutor::~SerialExecutor() {} + +Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, + StopToken stop_token, StopCallback&& stop_callback) { + // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called + // from external threads (e.g. when transferring back from blocking I/O threads) so a + // mutex is needed + { + std::lock_guard lg(state_->mutex); + state_->task_queue.push( + Task{std::move(task), std::move(stop_token), std::move(stop_callback)}); + } + state_->wait_for_tasks.notify_one(); + return Status::OK(); +} + +void SerialExecutor::MarkFinished(bool& finished) { + { + std::lock_guard(state_->mutex); + finished = true; + } + state_->wait_for_tasks.notify_one(); +} + +void SerialExecutor::RunLoop(const bool& finished) { + std::unique_lock lk(state_->mutex); + + while (!finished) { + while (!state_->task_queue.empty()) { + Task& task = state_->task_queue.front(); + lk.unlock(); + if (!task.stop_token.IsStopRequested()) { + std::move(task.callable)(); + } else { + if (task.stop_callback) { + std::move(task.stop_callback)(task.stop_token.Poll()); + } + // Can't break here because there may be cleanup tasks down the chain we still + // need to run. + } + lk.lock(); + state_->task_queue.pop(); + } + // In this case we must be waiting on work from external (e.g. I/O) executors. Wait + // for tasks to arrive (typically via transferred futures). + state_->wait_for_tasks.wait(lk, + [&] { return finished || !state_->task_queue.empty(); }); + } +} + struct ThreadPool::State { State() = default; diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 0abe381f100..1a4b33efff2 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -189,6 +190,64 @@ class ARROW_EXPORT Executor { StopCallback&&) = 0; }; +/// \brief An executor implementation that runs all tasks on a single thread using an +/// event loop. +/// +/// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are +/// fine but if one task needs to wait for another task it must be expressed as an +/// asynchronous continuation. +class ARROW_EXPORT SerialExecutor : public Executor { + public: + template + using FinishSignal = internal::FnOnce&)>; + template + using Scheduler = internal::FnOnce)>; + + SerialExecutor(); + ~SerialExecutor(); + + int GetCapacity() override { return 1; }; + Status SpawnReal(TaskHints hints, FnOnce task, StopToken, + StopCallback&&) override; + + /// \brief Runs the scheduler and any scheduled tasks + /// + /// The scheduler must either return an invalid status or call the finish signal. + /// Failure to do this will result in a deadlock. For this reason it is preferable (if + /// possible) to use the helper methods (below) RunSynchronously/RunSerially which + /// delegates the responsiblity onto a Future producer's existing responsibility to + /// always mark a future finished (which can someday be aided by ARROW-12207). + template + static Result RunInSerialExecutor(Scheduler initial_task) { + auto serial_executor = std::make_shared(); + return serial_executor->Run(std::move(initial_task)); + } + + private: + // State uses mutex + struct State; + std::unique_ptr state_; + + template + Result Run(Scheduler initial_task) { + bool finished = false; + Result final_result; + FinishSignal finish_signal = [&](const Result& res) { + // The finish signal could be called from an external executor callback so need to + // protect it with the mutex. Also, final_result must be set before the call to + // MarkFinished here to ensure we don't try and return it before it is finished + // setting + final_result = res; + MarkFinished(finished); + }; + ARROW_RETURN_NOT_OK(std::move(initial_task)(this, std::move(finish_signal))); + RunLoop(finished); + return final_result; + } + void RunLoop(const bool& finished); + void MarkFinished(bool& finished); +}; + // An Executor implementation spawning tasks in FIFO manner on a fixed-size // pool of worker threads. class ARROW_EXPORT ThreadPool : public Executor { @@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor { // Return the process-global thread pool for CPU-bound tasks. ARROW_EXPORT ThreadPool* GetCpuThreadPool(); +/// \brief Runs a potentially async operation serially +/// +/// This means that all CPU tasks spawned by the operation will run on the thread calling +/// this method and the future will be completed before this call finishes. +template +Result RunSerially(FnOnce(Executor*)> get_future) { + struct InnerCallback { + void operator()(const Result res) { std::move(finish_signal)(std::move(res)); } + SerialExecutor::FinishSignal finish_signal; + }; + struct OuterCallback { + Status operator()(Executor* executor, SerialExecutor::FinishSignal finish_signal) { + auto fut = std::move(get_future)(executor); + fut.AddCallback(InnerCallback{std::move(finish_signal)}); + return Status::OK(); + } + FnOnce(Executor*)> get_future; + }; + return SerialExecutor::RunInSerialExecutor(OuterCallback{std::move(get_future)}); +} + +/// \brief Potentially runs an async operation serially if use_threads is true +/// \see RunSerially +/// +/// If `use_threads` is false then the operation is run normally but this method will +/// still block the calling thread until the operation has completed. +template +Result RunSynchronously(FnOnce(Executor*)> get_future, bool use_threads) { + if (use_threads) { + return std::move(get_future)(GetCpuThreadPool()).result(); + } else { + return RunSerially(std::move(get_future)); + } +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 6f686ee650b..aaf06b2b9df 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -123,6 +123,125 @@ class AddTester { std::vector outs_; }; +template +struct TerminalCallback { + void operator()() { + auto result = std::move(callback)(); + std::move(finish_signal)(result); + } + + FnOnce()> callback; + SerialExecutor::FinishSignal finish_signal; +}; + +template <> +struct TerminalCallback { + void operator()() { + auto st = std::move(callback)(); + if (!st.ok()) { + std::move(finish_signal)(st); + } else { + std::move(finish_signal)(arrow::detail::Empty()); + } + } + + FnOnce callback; + SerialExecutor::FinishSignal<> finish_signal; +}; + +TEST(TestSerialExecutor, Create) { + bool task_ran = false; + SerialExecutor::Scheduler<> task = [&](Executor* executor, + SerialExecutor::FinishSignal<> finish_signal) { + EXPECT_TRUE(executor != nullptr); + task_ran = true; + std::move(finish_signal)(arrow::detail::Empty()); + return Status::OK(); + }; + ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(task))); + EXPECT_TRUE(task_ran); +} + +TEST(TestSerialExecutor, SpawnNested) { + bool nested_ran = false; + SerialExecutor::Scheduler<> scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { + return executor->Spawn(TerminalCallback<>{[&] { + nested_ran = true; + return Status::OK(); + }, + std::move(finish_signal)}); + }; + ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + EXPECT_TRUE(nested_ran); +} + +TEST(TestSerialExecutor, WithResult) { + SerialExecutor::Scheduler scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal finish_signal) { + return executor->Spawn( + TerminalCallback{[] { return 42; }, std::move(finish_signal)}); + }; + ASSERT_OK_AND_EQ(42, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +} + +TEST(TestSerialExecutor, StopToken) { + bool nested_ran = false; + StopSource stop_source; + SerialExecutor::Scheduler<> scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { + RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token())); + RETURN_NOT_OK(executor->Spawn( + TerminalCallback<>{[&] { return Status::OK(); }, std::move(finish_signal)})); + stop_source.RequestStop(Status::Invalid("XYZ")); + return Status::OK(); + }; + ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + EXPECT_FALSE(nested_ran); +} + +TEST(TestSerialExecutor, ContinueAfterExternal) { + bool continuation_ran = false; + EXPECT_OK_AND_ASSIGN(auto mockIoPool, ThreadPool::Make(1)); + SerialExecutor::Scheduler<> scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { + struct Callback { + void operator()(const Result& emp) { + continuation_ran = true; + std::move(finish_signal)(emp); + } + SerialExecutor::FinishSignal<> finish_signal; + bool& continuation_ran; + }; + executor + ->Transfer(DeferNotOk(mockIoPool->Submit([&] { + SleepABit(); + return Status::OK(); + }))) + .AddCallback(Callback{std::move(finish_signal), continuation_ran}); + return Status::OK(); + }; + ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + EXPECT_TRUE(continuation_ran); +} + +TEST(TestSerialExecutor, SchedulerAbort) { + SerialExecutor::Scheduler<> scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { + return Status::Invalid("XYZ"); + }; + ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +} + +TEST(TestSerialExecutor, PropagatedError) { + SerialExecutor::Scheduler<> scheduler = + [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { + std::move(finish_signal)(Status::Invalid("XYZ")); + return Status::OK(); + }; + ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +} + class TestThreadPool : public ::testing::Test { public: void TearDown() override { From 0d343cf2f3d17a2daee9224c64eac9b461c7451c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 5 Apr 2021 04:48:29 -1000 Subject: [PATCH 2/9] ARROW-12208: Restoring unit tests accidentally removed with previous commit --- cpp/src/arrow/dataset/file_csv_test.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 6431e3daa64..3acc5385ec6 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -227,18 +227,18 @@ TEST_P(TestCsvFileFormat, IsSupported) { ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); ASSERT_EQ(supported, false); - // source = GetFileSource(R"(declare,two - // 1,2,3)"); - // ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); - // ASSERT_EQ(supported, false); - - // source = GetFileSource(R"(f64 - // 1.0 - - // N/A - // 2)"); - // ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); - // EXPECT_EQ(supported, true); + source = GetFileSource(R"(declare,two + 1,2,3)"); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + ASSERT_EQ(supported, false); + + source = GetFileSource(R"(f64 + 1.0 + + N/A + 2)"); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + EXPECT_EQ(supported, true); } TEST_P(TestCsvFileFormat, NonProjectedFieldWithDifferingTypeFromInferred) { From d26283ae2a833f1057614e75ee94373787788431 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 5 Apr 2021 04:50:19 -1000 Subject: [PATCH 3/9] ARROW-12208: Restoring unit tests accidentally removed with previous commit --- cpp/src/arrow/dataset/file_csv_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 3acc5385ec6..99ca7cc0f42 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -228,15 +228,15 @@ TEST_P(TestCsvFileFormat, IsSupported) { ASSERT_EQ(supported, false); source = GetFileSource(R"(declare,two - 1,2,3)"); + 1,2,3)"); ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); ASSERT_EQ(supported, false); source = GetFileSource(R"(f64 - 1.0 +1.0 - N/A - 2)"); +N/A +2)"); ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); EXPECT_EQ(supported, true); } From 49a89bd4ee95533160190c26fb59e563070656f5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 5 Apr 2021 06:58:59 -1000 Subject: [PATCH 4/9] ARROW-12208: Forgot to assign the lock guard appropriately. --- cpp/src/arrow/util/thread_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index bc737b7e7c4..08b92710f67 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -69,7 +69,7 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, void SerialExecutor::MarkFinished(bool& finished) { { - std::lock_guard(state_->mutex); + std::lock_guard lk(state_->mutex); finished = true; } state_->wait_for_tasks.notify_one(); From 5fc6f94c9e3e98e65003184b3d79287747eace38 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 6 Apr 2021 13:31:13 -1000 Subject: [PATCH 5/9] ARROW-12208: Addressed some of the PR feedback --- cpp/src/arrow/dataset/scanner_internal.h | 1 - cpp/src/arrow/util/thread_pool.cc | 13 +- cpp/src/arrow/util/thread_pool.h | 52 ++++---- cpp/src/arrow/util/thread_pool_test.cc | 151 ++++++++++------------- 4 files changed, 102 insertions(+), 115 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 04145188cb2..985d60a97e5 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -29,7 +29,6 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" #include "arrow/util/logging.h" -#include "arrow/util/thread_pool.h" namespace arrow { diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 08b92710f67..ec6c6593c96 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -67,10 +67,10 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, return Status::OK(); } -void SerialExecutor::MarkFinished(bool& finished) { +void SerialExecutor::MarkFinished(bool* finished) { { std::lock_guard lk(state_->mutex); - finished = true; + *finished = true; } state_->wait_for_tasks.notify_one(); } @@ -80,7 +80,8 @@ void SerialExecutor::RunLoop(const bool& finished) { while (!finished) { while (!state_->task_queue.empty()) { - Task& task = state_->task_queue.front(); + Task task = std::move(state_->task_queue.front()); + state_->task_queue.pop(); lk.unlock(); if (!task.stop_token.IsStopRequested()) { std::move(task.callable)(); @@ -92,7 +93,6 @@ void SerialExecutor::RunLoop(const bool& finished) { // need to run. } lk.lock(); - state_->task_queue.pop(); } // In this case we must be waiting on work from external (e.g. I/O) executors. Wait // for tasks to arrive (typically via transferred futures). @@ -407,6 +407,11 @@ ThreadPool* GetCpuThreadPool() { return singleton.get(); } +Status RunSynchronouslyVoid(FnOnce(Executor*)> get_future, + bool use_threads) { + return RunSynchronously(std::move(get_future), use_threads).status(); +} + } // namespace internal int GetCpuThreadPoolCapacity() { return internal::GetCpuThreadPool()->GetCapacity(); } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 1a4b33efff2..328dafb9a6b 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -201,7 +201,7 @@ class ARROW_EXPORT SerialExecutor : public Executor { template using FinishSignal = internal::FnOnce&)>; template - using Scheduler = internal::FnOnce)>; + using TopLevelTask = internal::FnOnce)>; SerialExecutor(); ~SerialExecutor(); @@ -210,17 +210,17 @@ class ARROW_EXPORT SerialExecutor : public Executor { Status SpawnReal(TaskHints hints, FnOnce task, StopToken, StopCallback&&) override; - /// \brief Runs the scheduler and any scheduled tasks + /// \brief Runs the TopLevelTask and any scheduled tasks /// - /// The scheduler must either return an invalid status or call the finish signal. - /// Failure to do this will result in a deadlock. For this reason it is preferable (if - /// possible) to use the helper methods (below) RunSynchronously/RunSerially which - /// delegates the responsiblity onto a Future producer's existing responsibility to - /// always mark a future finished (which can someday be aided by ARROW-12207). + /// The TopLevelTask (or one of the tasks it schedules) must either return an invalid + /// status or call the finish signal. Failure to do this will result in a deadlock. For + /// this reason it is preferable (if possible) to use the helper methods (below) + /// RunSynchronously/RunSerially which delegates the responsiblity onto a Future + /// producer's existing responsibility to always mark a future finished (which can + /// someday be aided by ARROW-12207). template - static Result RunInSerialExecutor(Scheduler initial_task) { - auto serial_executor = std::make_shared(); - return serial_executor->Run(std::move(initial_task)); + static Result RunInSerialExecutor(TopLevelTask initial_task) { + return SerialExecutor().Run(std::move(initial_task)); } private: @@ -229,7 +229,7 @@ class ARROW_EXPORT SerialExecutor : public Executor { std::unique_ptr state_; template - Result Run(Scheduler initial_task) { + Result Run(TopLevelTask initial_task) { bool finished = false; Result final_result; FinishSignal finish_signal = [&](const Result& res) { @@ -238,18 +238,22 @@ class ARROW_EXPORT SerialExecutor : public Executor { // MarkFinished here to ensure we don't try and return it before it is finished // setting final_result = res; - MarkFinished(finished); + MarkFinished(&finished); }; ARROW_RETURN_NOT_OK(std::move(initial_task)(this, std::move(finish_signal))); RunLoop(finished); return final_result; } void RunLoop(const bool& finished); - void MarkFinished(bool& finished); + void MarkFinished(bool* finished); }; -// An Executor implementation spawning tasks in FIFO manner on a fixed-size -// pool of worker threads. +/// An Executor implementation spawning tasks in FIFO manner on a fixed-size +/// pool of worker threads. +/// +/// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are +/// fine but if one task needs to wait for another task it must be expressed as an +/// asynchronous continuation. class ARROW_EXPORT ThreadPool : public Executor { public: // Construct a thread pool with the given number of worker threads @@ -321,10 +325,11 @@ class ARROW_EXPORT ThreadPool : public Executor { // Return the process-global thread pool for CPU-bound tasks. ARROW_EXPORT ThreadPool* GetCpuThreadPool(); -/// \brief Runs a potentially async operation serially +/// \brief Run a potentially async operation serially /// -/// This means that all CPU tasks spawned by the operation will run on the thread calling -/// this method and the future will be completed before this call finishes. +/// `get_future` is called with a special executor which uses the calling thread to run +/// all thread tasks. The future must eventually finish and when it does this method will +/// return with the result of the future. template Result RunSerially(FnOnce(Executor*)> get_future) { struct InnerCallback { @@ -342,11 +347,14 @@ Result RunSerially(FnOnce(Executor*)> get_future) { return SerialExecutor::RunInSerialExecutor(OuterCallback{std::move(get_future)}); } -/// \brief Potentially runs an async operation serially if use_threads is true +/// \brief Potentially run an async operation serially (if use_threads is false) /// \see RunSerially /// -/// If `use_threads` is false then the operation is run normally but this method will -/// still block the calling thread until the operation has completed. +/// If `use_threads` is true, the global CPU executor is used. +/// If `use_threads` is false, a temporary SerialExecutor is used. +/// `get_future` is called (from this thread) with the chosen executor and must +/// return a future that will eventually finish. This function returns once the +/// future has finished. template Result RunSynchronously(FnOnce(Executor*)> get_future, bool use_threads) { if (use_threads) { @@ -356,5 +364,7 @@ Result RunSynchronously(FnOnce(Executor*)> get_future, bool use_thr } } +Status RunSynchronouslyVoid(FnOnce(Executor*)> get_future, + bool use_threads); } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index aaf06b2b9df..3529e9b6a4d 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -123,125 +123,98 @@ class AddTester { std::vector outs_; }; -template -struct TerminalCallback { - void operator()() { - auto result = std::move(callback)(); - std::move(finish_signal)(result); +class SerialExecutorTests : public testing::TestWithParam { + public: + bool UseThreads() { return GetParam(); } + template + Result Run(FnOnce(Executor*)> top_level_task) { + return RunSynchronously(std::move(top_level_task), UseThreads()); } - - FnOnce()> callback; - SerialExecutor::FinishSignal finish_signal; -}; - -template <> -struct TerminalCallback { - void operator()() { - auto st = std::move(callback)(); - if (!st.ok()) { - std::move(finish_signal)(st); - } else { - std::move(finish_signal)(arrow::detail::Empty()); - } + Status RunVoid(FnOnce(Executor*)> top_level_task) { + return RunSynchronouslyVoid(std::move(top_level_task), UseThreads()); } - - FnOnce callback; - SerialExecutor::FinishSignal<> finish_signal; }; -TEST(TestSerialExecutor, Create) { +TEST_P(SerialExecutorTests, SimpleRun) { bool task_ran = false; - SerialExecutor::Scheduler<> task = [&](Executor* executor, - SerialExecutor::FinishSignal<> finish_signal) { - EXPECT_TRUE(executor != nullptr); + auto task = [&](Executor* executor) { + EXPECT_NE(executor, nullptr); task_ran = true; - std::move(finish_signal)(arrow::detail::Empty()); - return Status::OK(); + return Future<>::MakeFinished(Status::OK()); }; - ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(task))); + ASSERT_OK(RunVoid(std::move(task))); EXPECT_TRUE(task_ran); } -TEST(TestSerialExecutor, SpawnNested) { +TEST_P(SerialExecutorTests, SpawnNested) { bool nested_ran = false; - SerialExecutor::Scheduler<> scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { - return executor->Spawn(TerminalCallback<>{[&] { - nested_ran = true; - return Status::OK(); - }, - std::move(finish_signal)}); - }; - ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + auto top_level_task = [&](Executor* executor) { + return DeferNotOk(executor->Submit([&] { + nested_ran = true; + return Status::OK(); + })); + }; + ASSERT_OK(RunVoid(std::move(top_level_task))); EXPECT_TRUE(nested_ran); } -TEST(TestSerialExecutor, WithResult) { - SerialExecutor::Scheduler scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal finish_signal) { - return executor->Spawn( - TerminalCallback{[] { return 42; }, std::move(finish_signal)}); - }; - ASSERT_OK_AND_EQ(42, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +TEST_P(SerialExecutorTests, WithResult) { + auto top_level_task = [&](Executor* executor) { + return DeferNotOk(executor->Submit([] { return 42; })); + }; + ASSERT_OK_AND_EQ(42, Run(std::move(top_level_task))); } -TEST(TestSerialExecutor, StopToken) { +TEST_P(SerialExecutorTests, StopToken) { bool nested_ran = false; StopSource stop_source; - SerialExecutor::Scheduler<> scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { - RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token())); - RETURN_NOT_OK(executor->Spawn( - TerminalCallback<>{[&] { return Status::OK(); }, std::move(finish_signal)})); - stop_source.RequestStop(Status::Invalid("XYZ")); - return Status::OK(); - }; - ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + auto top_level_task = [&](Executor* executor) -> Future<> { + stop_source.RequestStop(Status::Invalid("XYZ")); + RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token())); + auto result = DeferNotOk(executor->Submit([&] { return Status::OK(); })); + return result; + }; + ASSERT_OK(RunVoid(std::move(top_level_task))); EXPECT_FALSE(nested_ran); } -TEST(TestSerialExecutor, ContinueAfterExternal) { +TEST_P(SerialExecutorTests, ContinueAfterExternal) { bool continuation_ran = false; - EXPECT_OK_AND_ASSIGN(auto mockIoPool, ThreadPool::Make(1)); - SerialExecutor::Scheduler<> scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { - struct Callback { - void operator()(const Result& emp) { - continuation_ran = true; - std::move(finish_signal)(emp); - } - SerialExecutor::FinishSignal<> finish_signal; - bool& continuation_ran; - }; - executor - ->Transfer(DeferNotOk(mockIoPool->Submit([&] { - SleepABit(); - return Status::OK(); - }))) - .AddCallback(Callback{std::move(finish_signal), continuation_ran}); + EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1)); + auto top_level_task = [&](Executor* executor) { + struct Callback { + Status operator()(...) { + continuation_ran = true; return Status::OK(); - }; - ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler))); + } + bool& continuation_ran; + }; + return executor + ->Transfer(DeferNotOk(mock_io_pool->Submit([&] { + SleepABit(); + return Status::OK(); + }))) + .Then(Callback{continuation_ran}); + }; + ASSERT_OK(RunVoid(std::move(top_level_task))); EXPECT_TRUE(continuation_ran); } -TEST(TestSerialExecutor, SchedulerAbort) { - SerialExecutor::Scheduler<> scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { - return Status::Invalid("XYZ"); - }; - ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +TEST_P(SerialExecutorTests, SchedulerAbort) { + auto top_level_task = [&](Executor* executor) { return Status::Invalid("XYZ"); }; + ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task))); } -TEST(TestSerialExecutor, PropagatedError) { - SerialExecutor::Scheduler<> scheduler = - [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) { - std::move(finish_signal)(Status::Invalid("XYZ")); - return Status::OK(); - }; - ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); +TEST_P(SerialExecutorTests, PropagatedError) { + auto top_level_task = [&](Executor* executor) { + return DeferNotOk(executor->Submit([] { return Status::Invalid("XYZ"); })); + }; + ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task))); } +INSTANTIATE_TEST_SUITE_P(SerialExecutorTests, SerialExecutorTests, + ::testing::Values(false, true)); + class TestThreadPool : public ::testing::Test { public: void TearDown() override { From 98593929a3481fe3e81e44e9b79e8006dbabeb59 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 6 Apr 2021 14:35:02 -1000 Subject: [PATCH 6/9] ARROW-12208: Forgot to export function --- cpp/src/arrow/util/thread_pool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 328dafb9a6b..73ed5f6fd16 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -364,7 +364,7 @@ Result RunSynchronously(FnOnce(Executor*)> get_future, bool use_thr } } -Status RunSynchronouslyVoid(FnOnce(Executor*)> get_future, - bool use_threads); +ARROW_EXPORT Status RunSynchronouslyVoid( + FnOnce(Executor*)> get_future, bool use_threads); } // namespace internal } // namespace arrow From fc2aa0c77ef4a887e622cd36f02aea836a9ae876 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 07:39:40 -1000 Subject: [PATCH 7/9] ARROW-12208: Addressed PR comments --- cpp/src/arrow/dataset/scanner.cc | 1 + cpp/src/arrow/dataset/scanner.h | 4 +- cpp/src/arrow/dataset/scanner_internal.h | 1 + cpp/src/arrow/util/thread_pool.cc | 13 ++--- cpp/src/arrow/util/thread_pool.h | 63 ++++++++---------------- 5 files changed, 32 insertions(+), 50 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 85b886c170f..967c2c30a3b 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -26,6 +26,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/scanner_internal.h" #include "arrow/table.h" +#include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/task_group.h" diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 6a6e72c8406..0cbf79a4788 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -31,12 +32,11 @@ #include "arrow/dataset/visibility.h" #include "arrow/memory_pool.h" #include "arrow/type_fwd.h" -#include "arrow/util/async_generator.h" #include "arrow/util/thread_pool.h" #include "arrow/util/type_fwd.h" namespace arrow { -using RecordBatchGenerator = AsyncGenerator>; +using RecordBatchGenerator = std::function>()>; namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 985d60a97e5..d334c094d31 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -28,6 +28,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" +#include "arrow/util/async_generator.h" #include "arrow/util/logging.h" namespace arrow { diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index ec6c6593c96..b329603ad87 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -48,6 +48,7 @@ struct SerialExecutor::State { std::queue task_queue; std::mutex mutex; std::condition_variable wait_for_tasks; + bool finished; }; SerialExecutor::SerialExecutor() : state_(new State()) {} @@ -67,18 +68,18 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, return Status::OK(); } -void SerialExecutor::MarkFinished(bool* finished) { +void SerialExecutor::MarkFinished() { { std::lock_guard lk(state_->mutex); - *finished = true; + state_->finished = true; } state_->wait_for_tasks.notify_one(); } -void SerialExecutor::RunLoop(const bool& finished) { +void SerialExecutor::RunLoop() { std::unique_lock lk(state_->mutex); - while (!finished) { + while (!state_->finished) { while (!state_->task_queue.empty()) { Task task = std::move(state_->task_queue.front()); state_->task_queue.pop(); @@ -96,8 +97,8 @@ void SerialExecutor::RunLoop(const bool& finished) { } // In this case we must be waiting on work from external (e.g. I/O) executors. Wait // for tasks to arrive (typically via transferred futures). - state_->wait_for_tasks.wait(lk, - [&] { return finished || !state_->task_queue.empty(); }); + state_->wait_for_tasks.wait( + lk, [&] { return state_->finished || !state_->task_queue.empty(); }); } } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 73ed5f6fd16..252de8ea6aa 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -199,11 +199,8 @@ class ARROW_EXPORT Executor { class ARROW_EXPORT SerialExecutor : public Executor { public: template - using FinishSignal = internal::FnOnce&)>; - template - using TopLevelTask = internal::FnOnce)>; + using TopLevelTask = internal::FnOnce(Executor*)>; - SerialExecutor(); ~SerialExecutor(); int GetCapacity() override { return 1; }; @@ -224,28 +221,32 @@ class ARROW_EXPORT SerialExecutor : public Executor { } private: + SerialExecutor(); + // State uses mutex struct State; std::unique_ptr state_; template Result Run(TopLevelTask initial_task) { - bool finished = false; - Result final_result; - FinishSignal finish_signal = [&](const Result& res) { - // The finish signal could be called from an external executor callback so need to - // protect it with the mutex. Also, final_result must be set before the call to - // MarkFinished here to ensure we don't try and return it before it is finished - // setting - final_result = res; - MarkFinished(&finished); - }; - ARROW_RETURN_NOT_OK(std::move(initial_task)(this, std::move(finish_signal))); - RunLoop(finished); - return final_result; + auto final_fut = std::move(initial_task)(this); + if (final_fut.is_finished()) { + return final_fut.result(); + } + final_fut = final_fut.Then( + [this](const T& res) { + MarkFinished(); + return res; + }, + [this](const Status& st) -> Result { + MarkFinished(); + return st; + }); + RunLoop(); + return final_fut.result(); } - void RunLoop(const bool& finished); - void MarkFinished(bool* finished); + void RunLoop(); + void MarkFinished(); }; /// An Executor implementation spawning tasks in FIFO manner on a fixed-size @@ -325,28 +326,6 @@ class ARROW_EXPORT ThreadPool : public Executor { // Return the process-global thread pool for CPU-bound tasks. ARROW_EXPORT ThreadPool* GetCpuThreadPool(); -/// \brief Run a potentially async operation serially -/// -/// `get_future` is called with a special executor which uses the calling thread to run -/// all thread tasks. The future must eventually finish and when it does this method will -/// return with the result of the future. -template -Result RunSerially(FnOnce(Executor*)> get_future) { - struct InnerCallback { - void operator()(const Result res) { std::move(finish_signal)(std::move(res)); } - SerialExecutor::FinishSignal finish_signal; - }; - struct OuterCallback { - Status operator()(Executor* executor, SerialExecutor::FinishSignal finish_signal) { - auto fut = std::move(get_future)(executor); - fut.AddCallback(InnerCallback{std::move(finish_signal)}); - return Status::OK(); - } - FnOnce(Executor*)> get_future; - }; - return SerialExecutor::RunInSerialExecutor(OuterCallback{std::move(get_future)}); -} - /// \brief Potentially run an async operation serially (if use_threads is false) /// \see RunSerially /// @@ -360,7 +339,7 @@ Result RunSynchronously(FnOnce(Executor*)> get_future, bool use_thr if (use_threads) { return std::move(get_future)(GetCpuThreadPool()).result(); } else { - return RunSerially(std::move(get_future)); + return SerialExecutor::RunInSerialExecutor(std::move(get_future)); } } From 3e8a346425c3f907da839823b65c692c50506993 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 8 Apr 2021 14:28:29 +0200 Subject: [PATCH 8/9] * Simplify SerialExecutor a bit * Improve tests --- cpp/src/arrow/dataset/scanner.h | 3 +- cpp/src/arrow/util/thread_pool.cc | 9 +++-- cpp/src/arrow/util/thread_pool.h | 10 +---- cpp/src/arrow/util/thread_pool_test.cc | 52 ++++++++++++++++++++------ 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 0cbf79a4788..9bd4b10847b 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -32,11 +32,12 @@ #include "arrow/dataset/visibility.h" #include "arrow/memory_pool.h" #include "arrow/type_fwd.h" -#include "arrow/util/thread_pool.h" #include "arrow/util/type_fwd.h" namespace arrow { + using RecordBatchGenerator = std::function>()>; + namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index b329603ad87..873b9335e74 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -69,10 +70,10 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, } void SerialExecutor::MarkFinished() { - { - std::lock_guard lk(state_->mutex); - state_->finished = true; - } + std::lock_guard lk(state_->mutex); + state_->finished = true; + // Keep the lock when notifying to avoid situations where the SerialExecutor + // would start being destroyed while the notify_one() call is still ongoing. state_->wait_for_tasks.notify_one(); } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 252de8ea6aa..c4d4d1869c6 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -233,15 +233,7 @@ class ARROW_EXPORT SerialExecutor : public Executor { if (final_fut.is_finished()) { return final_fut.result(); } - final_fut = final_fut.Then( - [this](const T& res) { - MarkFinished(); - return res; - }, - [this](const Status& st) -> Result { - MarkFinished(); - return st; - }); + final_fut.AddCallback([this](const Result&) { MarkFinished(); }); RunLoop(); return final_fut.result(); } diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 3529e9b6a4d..2390f8c1a41 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -123,19 +123,21 @@ class AddTester { std::vector outs_; }; -class SerialExecutorTests : public testing::TestWithParam { +class TestRunSynchronously : public testing::TestWithParam { public: bool UseThreads() { return GetParam(); } + template Result Run(FnOnce(Executor*)> top_level_task) { return RunSynchronously(std::move(top_level_task), UseThreads()); } + Status RunVoid(FnOnce(Executor*)> top_level_task) { return RunSynchronouslyVoid(std::move(top_level_task), UseThreads()); } }; -TEST_P(SerialExecutorTests, SimpleRun) { +TEST_P(TestRunSynchronously, SimpleRun) { bool task_ran = false; auto task = [&](Executor* executor) { EXPECT_NE(executor, nullptr); @@ -146,7 +148,7 @@ TEST_P(SerialExecutorTests, SimpleRun) { EXPECT_TRUE(task_ran); } -TEST_P(SerialExecutorTests, SpawnNested) { +TEST_P(TestRunSynchronously, SpawnNested) { bool nested_ran = false; auto top_level_task = [&](Executor* executor) { return DeferNotOk(executor->Submit([&] { @@ -158,27 +160,55 @@ TEST_P(SerialExecutorTests, SpawnNested) { EXPECT_TRUE(nested_ran); } -TEST_P(SerialExecutorTests, WithResult) { +TEST_P(TestRunSynchronously, SpawnMoreNested) { + std::atomic nested_ran{0}; + auto top_level_task = [&](Executor* executor) -> Future<> { + auto fut_a = DeferNotOk(executor->Submit([&] { nested_ran++; })); + auto fut_b = DeferNotOk(executor->Submit([&] { nested_ran++; })); + return AllComplete({fut_a, fut_b}) + .Then([&](const Result& result) { + nested_ran++; + return result; + }); + }; + ASSERT_OK(RunVoid(std::move(top_level_task))); + EXPECT_EQ(nested_ran, 3); +} + +TEST_P(TestRunSynchronously, WithResult) { auto top_level_task = [&](Executor* executor) { return DeferNotOk(executor->Submit([] { return 42; })); }; ASSERT_OK_AND_EQ(42, Run(std::move(top_level_task))); } -TEST_P(SerialExecutorTests, StopToken) { +TEST_P(TestRunSynchronously, StopTokenSpawn) { bool nested_ran = false; StopSource stop_source; auto top_level_task = [&](Executor* executor) -> Future<> { stop_source.RequestStop(Status::Invalid("XYZ")); RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token())); - auto result = DeferNotOk(executor->Submit([&] { return Status::OK(); })); - return result; + return Future<>::MakeFinished(); }; ASSERT_OK(RunVoid(std::move(top_level_task))); EXPECT_FALSE(nested_ran); } -TEST_P(SerialExecutorTests, ContinueAfterExternal) { +TEST_P(TestRunSynchronously, StopTokenSubmit) { + bool nested_ran = false; + StopSource stop_source; + auto top_level_task = [&](Executor* executor) -> Future<> { + stop_source.RequestStop(); + return DeferNotOk(executor->Submit(stop_source.token(), [&] { + nested_ran = true; + return Status::OK(); + })); + }; + ASSERT_RAISES(Cancelled, RunVoid(std::move(top_level_task))); + EXPECT_FALSE(nested_ran); +} + +TEST_P(TestRunSynchronously, ContinueAfterExternal) { bool continuation_ran = false; EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1)); auto top_level_task = [&](Executor* executor) { @@ -200,19 +230,19 @@ TEST_P(SerialExecutorTests, ContinueAfterExternal) { EXPECT_TRUE(continuation_ran); } -TEST_P(SerialExecutorTests, SchedulerAbort) { +TEST_P(TestRunSynchronously, SchedulerAbort) { auto top_level_task = [&](Executor* executor) { return Status::Invalid("XYZ"); }; ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task))); } -TEST_P(SerialExecutorTests, PropagatedError) { +TEST_P(TestRunSynchronously, PropagatedError) { auto top_level_task = [&](Executor* executor) { return DeferNotOk(executor->Submit([] { return Status::Invalid("XYZ"); })); }; ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task))); } -INSTANTIATE_TEST_SUITE_P(SerialExecutorTests, SerialExecutorTests, +INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously, ::testing::Values(false, true)); class TestThreadPool : public ::testing::Test { From c215a31aee210df04ac0a292051de26fe8b479a6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Apr 2021 19:48:02 -1000 Subject: [PATCH 9/9] ARROW-12208: Instead of calling task_group->Finish and then AllComplete(futures) I added task_group->FinishAsync to the futures and called AllComplete on that. --- cpp/src/arrow/dataset/file_base.cc | 2 +- cpp/src/arrow/dataset/scanner.cc | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index be283491ade..ad19bd2041e 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -448,7 +448,7 @@ Future<> WriteInternal(const ScanOptions& scan_options, WriteState& state, }); } } - RETURN_NOT_OK(task_group->Finish()); + scan_futs.push_back(task_group->FinishAsync()); return AllComplete(scan_futs); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 967c2c30a3b..a8ac24b7799 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -241,17 +241,15 @@ Future> Scanner::ToTableInternal( } } auto scan_options = scan_options_; - // Wait for all async tasks to complete, or the first error + scan_futures.push_back(task_group->FinishAsync()); + // Wait for all tasks to complete, or the first error return AllComplete(scan_futures) - .Then([task_group, scan_options, - state](const detail::Empty&) -> Result> { - // Wait for all sync tasks to complete, or the first error. - RETURN_NOT_OK(task_group->Finish()); - - return Table::FromRecordBatches( - scan_options->projected_schema, - FlattenRecordBatchVector(std::move(state->batches))); - }); + .Then( + [scan_options, state](const detail::Empty&) -> Result> { + return Table::FromRecordBatches( + scan_options->projected_schema, + FlattenRecordBatchVector(std::move(state->batches))); + }); } } // namespace dataset