From a6fd18b2ec870dc3eafea4782a27fb98750b01f1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 6 May 2022 14:38:13 -1000 Subject: [PATCH 1/6] ARROW-16498: Add some stress tests for the TaskScheduler. Fix a deadlock in the task scheduler. --- cpp/src/arrow/compute/exec/CMakeLists.txt | 2 +- cpp/src/arrow/compute/exec/task_util.cc | 17 ++ cpp/src/arrow/compute/exec/task_util_test.cc | 211 +++++++++++++++++++ 3 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 cpp/src/arrow/compute/exec/task_util_test.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index b2a21c2bd6b..81391505d9e 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -34,7 +34,7 @@ add_arrow_compute_test(hash_join_node_test key_hash_test.cc) add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") -add_arrow_compute_test(util_test PREFIX "arrow-compute") +add_arrow_compute_test(util_test PREFIX "arrow-compute" SOURCES util_test.cc task_util_test.cc) add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/task_util.cc b/cpp/src/arrow/compute/exec/task_util.cc index e5e714d34ab..8568450119a 100644 --- a/cpp/src/arrow/compute/exec/task_util.cc +++ b/cpp/src/arrow/compute/exec/task_util.cc @@ -96,6 +96,11 @@ class TaskSchedulerImpl : public TaskScheduler { // fields), aborted_ flag and register_finished_ flag AtomicWithPadding num_tasks_to_schedule_; + // If a task group adds tasks it's possible for a thread inside + // ScheduleMore to miss this fact. This serves as a flag to + // notify the scheduling thread that it might need to make + // another pass through the scheduler + AtomicWithPadding tasks_added_recently_; }; TaskSchedulerImpl::TaskSchedulerImpl() @@ -104,6 +109,7 @@ TaskSchedulerImpl::TaskSchedulerImpl() aborted_(false), register_finished_(false) { num_tasks_to_schedule_.value.store(0); + tasks_added_recently_.value.store(false); } int TaskSchedulerImpl::RegisterTaskGroup(TaskImpl task_impl, @@ -150,6 +156,7 @@ Status TaskSchedulerImpl::StartTaskGroup(size_t thread_id, int group_id, } if (!aborted) { + tasks_added_recently_.value.store(true); return ScheduleMore(thread_id); } else { return Status::Cancelled("Scheduler cancelled"); @@ -343,6 +350,16 @@ Status TaskSchedulerImpl::ScheduleMore(size_t thread_id, int num_tasks_finished) num_tasks_to_schedule_.value += num_new_tasks - static_cast(tasks.size()); } + bool expected_might_have_missed_tasks = true; + if (tasks_added_recently_.value.compare_exchange_strong( + expected_might_have_missed_tasks, false)) { + if (tasks.empty()) { + // num_tasks_finished has already been added to num_tasks_to_schedule so + // pass 0 here. + return ScheduleMore(thread_id); + } + } + for (size_t i = 0; i < tasks.size(); ++i) { int group_id = tasks[i].first; int64_t task_id = tasks[i].second; diff --git a/cpp/src/arrow/compute/exec/task_util_test.cc b/cpp/src/arrow/compute/exec/task_util_test.cc new file mode 100644 index 00000000000..124ee71b830 --- /dev/null +++ b/cpp/src/arrow/compute/exec/task_util_test.cc @@ -0,0 +1,211 @@ +#include "arrow/compute/exec/task_util.h" + +#include +#include +#include +#include +#include + +#include + +#include "arrow/compute/exec/util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using internal::ThreadPool; + +namespace compute { + +/// \brief Create a thread pool and start all threads +/// +/// By default a thread pool will not create threads until they +/// are actually needed. This can make it a bit difficult to +/// reproduce certain issues. This creates a thread pool and +/// then makes sure the threads are actually created before +/// returning it. +Result> MakePrimedThreadPool(int num_threads) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr thread_pool, + ThreadPool::Make(num_threads)); + int num_threads_running = 0; + std::mutex mutex; + std::condition_variable thread_gate; + std::condition_variable primer_gate; + for (int i = 0; i < num_threads; i++) { + // This shouldn't fail and, if it fails midway, we will have some threads + // still running if we do RETURN_NOT_OK so lets do ABORT_NOT_OK + ABORT_NOT_OK(thread_pool->Spawn([&] { + std::unique_lock lk(mutex); + num_threads_running++; + primer_gate.notify_one(); + thread_gate.wait(lk); + })); + } + std::unique_lock primer_lock(mutex); + primer_gate.wait(primer_lock, [&] { return num_threads_running == num_threads; }); + thread_gate.notify_all(); + primer_lock.unlock(); + thread_pool->WaitForIdle(); + return thread_pool; +} + +Status SlowTaskImpl(std::size_t, int64_t) { + SleepABit(); + return Status::OK(); +} +Status FastTaskImpl(std::size_t, int64_t) { return Status::OK(); } +// If this is the last task group then start the next stage +TaskScheduler::TaskGroupContinuationImpl MakeContinuation( + std::atomic* counter, std::function start_next_stage, + int next_stage) { + return [counter, start_next_stage, next_stage](std::size_t thread_id) { + if (counter->fetch_sub(1) == 1) { + start_next_stage(thread_id, next_stage); + } + return Status::OK(); + }; +} +// Signal the cv if this is the last group +TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation( + std::atomic* counter, std::mutex* mutex, std::condition_variable* finish) { + return [=](std::size_t thread_id) { + if (counter->fetch_sub(1) == 1) { + std::lock_guard lg(*mutex); + finish->notify_one(); + } + return Status::OK(); + }; +} + +// This test simulates one of the current use patterns of the +// task scheduler. There are a number of groups. The groups +// are allocated to stages. All groups in a stage execute +// concurrently. When all groups in that stage finish the next +// stage is started. +TEST(TaskScheduler, Stress) { + constexpr int kNumThreads = 8; + constexpr int kNumGroups = 8; + constexpr int kGroupsPerStage = 3; + constexpr int kTasksPerGroup = 32; + constexpr int kNumStages = (kNumGroups % kGroupsPerStage == 0) + ? (kNumGroups / kGroupsPerStage) + : (kNumGroups / kGroupsPerStage) + 1; + constexpr int kTrailingGroups = (kNumGroups % kGroupsPerStage == 0) + ? kGroupsPerStage + : kNumGroups % kGroupsPerStage; + + ThreadIndexer thread_indexer; + int num_threads = std::min(static_cast(thread_indexer.Capacity()), kNumThreads); + ASSERT_OK_AND_ASSIGN(std::shared_ptr thread_pool, + ThreadPool::Make(num_threads)); + + std::array, kNumStages - 1> stage_counters; + for (int i = 0; i < kNumStages - 1; i++) { + stage_counters[i].store(kGroupsPerStage); + } + std::atomic final_counter(kTrailingGroups); + std::mutex mutex; + std::condition_variable finish_cv; + + std::vector group_ids; + auto scheduler = TaskScheduler::Make(); + + std::function start_next_stage = [&](std::size_t thread_id, + int stage_index) { + int start = stage_index * kGroupsPerStage; + int end = std::min(kNumGroups, start + kGroupsPerStage); + for (int i = start; i < end; i++) { + ASSERT_OK(thread_pool->Spawn([&scheduler, &group_ids, &thread_indexer, i] { + std::size_t my_thread_id = thread_indexer(); + SleepABit(); + ASSERT_OK(scheduler->StartTaskGroup(my_thread_id, group_ids[i], kTasksPerGroup)); + })); + } + }; + + for (auto i = 0; i < kNumGroups; i++) { + int next_stage = (i / kGroupsPerStage) + 1; + TaskScheduler::TaskGroupContinuationImpl finish = + MakeFinalContinuation(&final_counter, &mutex, &finish_cv); + if (next_stage < kNumStages) { + finish = + MakeContinuation(&stage_counters[next_stage - 1], start_next_stage, next_stage); + } + group_ids.push_back(scheduler->RegisterTaskGroup(SlowTaskImpl, finish)); + } + scheduler->RegisterEnd(); + + TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; }; + TaskScheduler::ScheduleImpl schedule = + [&](TaskScheduler::TaskGroupContinuationImpl task) { + return thread_pool->Spawn([&, task] { + std::size_t thread_id = thread_indexer(); + ASSERT_OK(task(thread_id)); + }); + }; + std::unique_lock lock(mutex); + ASSERT_OK(thread_pool->Spawn([&] { + std::size_t thread_id = thread_indexer(); + ASSERT_OK(scheduler->StartScheduling(thread_id, schedule, num_threads * 4, false)); + start_next_stage(thread_id, 0); + })); + + finish_cv.wait(lock); + thread_pool->WaitForIdle(); +} + +// This is a reproducer for a bug that was encountered when one +// thread starts a task group while another thread is finishing +// the last of its tasks. +TEST(TaskScheduler, StressTwo) { + constexpr int kNumThreads = 16; + constexpr int kNumGroups = 8; + constexpr int kTasksPerGroup = 1; + constexpr int kIterations = 1000; + + ThreadIndexer thread_indexer; + int num_threads = std::min(static_cast(thread_indexer.Capacity()), kNumThreads); + ASSERT_OK_AND_ASSIGN(std::shared_ptr thread_pool, + MakePrimedThreadPool(num_threads)); + + for (int i = 0; i < kIterations; i++) { + std::atomic final_counter(kNumGroups); + std::mutex mutex; + std::condition_variable finish_cv; + + std::vector group_ids; + auto scheduler = TaskScheduler::Make(); + + for (auto i = 0; i < kNumGroups; i++) { + TaskScheduler::TaskGroupContinuationImpl finish = + MakeFinalContinuation(&final_counter, &mutex, &finish_cv); + group_ids.push_back(scheduler->RegisterTaskGroup(FastTaskImpl, finish)); + } + scheduler->RegisterEnd(); + + TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; }; + TaskScheduler::ScheduleImpl schedule = + [&](TaskScheduler::TaskGroupContinuationImpl task) { + return thread_pool->Spawn([&, task] { + std::size_t thread_id = thread_indexer(); + ASSERT_OK(task(thread_id)); + }); + }; + + ASSERT_OK(scheduler->StartScheduling(0, schedule, num_threads * 4, false)); + std::unique_lock lock(mutex); + for (int i = 0; i < kNumGroups; i++) { + ASSERT_OK(thread_pool->Spawn([&, i] { + std::size_t thread_id = thread_indexer(); + ASSERT_OK(scheduler->StartTaskGroup(thread_id, i, kTasksPerGroup)); + })); + } + + finish_cv.wait(lock); + thread_pool->WaitForIdle(); + } +} + +} // namespace compute +} // namespace arrow From e2d04282c7a46be7fde01c88ba5366664bef1190 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 9 May 2022 11:52:09 -1000 Subject: [PATCH 2/6] ARROW-16498: Adding missing copyright header --- cpp/src/arrow/compute/exec/task_util_test.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cpp/src/arrow/compute/exec/task_util_test.cc b/cpp/src/arrow/compute/exec/task_util_test.cc index 124ee71b830..404333b001e 100644 --- a/cpp/src/arrow/compute/exec/task_util_test.cc +++ b/cpp/src/arrow/compute/exec/task_util_test.cc @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #include "arrow/compute/exec/task_util.h" #include From 214cc1bcfd8d7841b7a7213ab4e28d13060c3b2c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 9 May 2022 15:49:00 -1000 Subject: [PATCH 3/6] ARROW-16498: cmake format --- cpp/src/arrow/compute/exec/CMakeLists.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 81391505d9e..0a5f1f30ffa 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -34,7 +34,12 @@ add_arrow_compute_test(hash_join_node_test key_hash_test.cc) add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") -add_arrow_compute_test(util_test PREFIX "arrow-compute" SOURCES util_test.cc task_util_test.cc) +add_arrow_compute_test(util_test + PREFIX + "arrow-compute" + SOURCES + util_test.cc + task_util_test.cc) add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") From 600569e03e061951f8359fcfe406b193558eebbc Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 13 May 2022 11:07:15 -1000 Subject: [PATCH 4/6] ARROW-16498: exporting ThreadIndexer as it is now relied on by some unit tests --- cpp/src/arrow/compute/exec/util.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index b0e423c8580..036e60345ec 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -280,7 +280,7 @@ class AtomicCounter { std::atomic complete_{false}; }; -class ThreadIndexer { +class ARROW_EXPORT ThreadIndexer { public: size_t operator()(); From 9a323675a93ef81f17fa9b18c48c08a3eaec6dd5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 13 May 2022 11:27:02 -1000 Subject: [PATCH 5/6] ARROW-16498: Exporting TaskScheduler as it is now needed by some unit tests --- cpp/src/arrow/compute/exec/task_util.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/task_util.h b/cpp/src/arrow/compute/exec/task_util.h index 44540d255df..5dc66aee167 100644 --- a/cpp/src/arrow/compute/exec/task_util.h +++ b/cpp/src/arrow/compute/exec/task_util.h @@ -53,7 +53,7 @@ class AtomicWithPadding { // // Also allows for executing next pending tasks immediately using a caller thread. // -class TaskScheduler { +class ARROW_EXPORT TaskScheduler { public: using TaskImpl = std::function; using TaskGroupContinuationImpl = std::function; From c17c8be98ed8648612cf48d38b0da934a9e06ad6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 13 May 2022 11:48:22 -1000 Subject: [PATCH 6/6] ARROW-16498: Modifying captures slightly since some MSVC versions require you to capture constants --- cpp/src/arrow/compute/exec/task_util_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/task_util_test.cc b/cpp/src/arrow/compute/exec/task_util_test.cc index 404333b001e..fd9502bdfac 100644 --- a/cpp/src/arrow/compute/exec/task_util_test.cc +++ b/cpp/src/arrow/compute/exec/task_util_test.cc @@ -133,7 +133,7 @@ TEST(TaskScheduler, Stress) { int start = stage_index * kGroupsPerStage; int end = std::min(kNumGroups, start + kGroupsPerStage); for (int i = start; i < end; i++) { - ASSERT_OK(thread_pool->Spawn([&scheduler, &group_ids, &thread_indexer, i] { + ASSERT_OK(thread_pool->Spawn([&, i] { std::size_t my_thread_id = thread_indexer(); SleepABit(); ASSERT_OK(scheduler->StartTaskGroup(my_thread_id, group_ids[i], kTasksPerGroup));