Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ add_arrow_compute_test(hash_join_node_test
key_hash_test.cc)
add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute")
add_arrow_compute_test(union_node_test PREFIX "arrow-compute")
add_arrow_compute_test(util_test PREFIX "arrow-compute")
add_arrow_compute_test(util_test
PREFIX
"arrow-compute"
SOURCES
util_test.cc
task_util_test.cc)

add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")

Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/compute/exec/task_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class TaskSchedulerImpl : public TaskScheduler {
// fields), aborted_ flag and register_finished_ flag

AtomicWithPadding<int> num_tasks_to_schedule_;
// If a task group adds tasks it's possible for a thread inside
// ScheduleMore to miss this fact. This serves as a flag to
// notify the scheduling thread that it might need to make
// another pass through the scheduler
AtomicWithPadding<bool> tasks_added_recently_;
};

TaskSchedulerImpl::TaskSchedulerImpl()
Expand All @@ -104,6 +109,7 @@ TaskSchedulerImpl::TaskSchedulerImpl()
aborted_(false),
register_finished_(false) {
num_tasks_to_schedule_.value.store(0);
tasks_added_recently_.value.store(false);
}

int TaskSchedulerImpl::RegisterTaskGroup(TaskImpl task_impl,
Expand Down Expand Up @@ -150,6 +156,7 @@ Status TaskSchedulerImpl::StartTaskGroup(size_t thread_id, int group_id,
}

if (!aborted) {
tasks_added_recently_.value.store(true);
return ScheduleMore(thread_id);
} else {
return Status::Cancelled("Scheduler cancelled");
Expand Down Expand Up @@ -343,6 +350,16 @@ Status TaskSchedulerImpl::ScheduleMore(size_t thread_id, int num_tasks_finished)
num_tasks_to_schedule_.value += num_new_tasks - static_cast<int>(tasks.size());
}

bool expected_might_have_missed_tasks = true;
if (tasks_added_recently_.value.compare_exchange_strong(
expected_might_have_missed_tasks, false)) {
if (tasks.empty()) {
// num_tasks_finished has already been added to num_tasks_to_schedule so
// pass 0 here.
return ScheduleMore(thread_id);
}
}

for (size_t i = 0; i < tasks.size(); ++i) {
int group_id = tasks[i].first;
int64_t task_id = tasks[i].second;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AtomicWithPadding {
//
// Also allows for executing next pending tasks immediately using a caller thread.
//
class TaskScheduler {
class ARROW_EXPORT TaskScheduler {
public:
using TaskImpl = std::function<Status(size_t, int64_t)>;
using TaskGroupContinuationImpl = std::function<Status(size_t)>;
Expand Down
228 changes: 228 additions & 0 deletions cpp/src/arrow/compute/exec/task_util_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/task_util.h"

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>

#include <gtest/gtest.h>

#include "arrow/compute/exec/util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::ThreadPool;

namespace compute {

/// \brief Create a thread pool and start all threads
///
/// By default a thread pool will not create threads until they
/// are actually needed. This can make it a bit difficult to
/// reproduce certain issues. This creates a thread pool and
/// then makes sure the threads are actually created before
/// returning it.
Result<std::shared_ptr<ThreadPool>> MakePrimedThreadPool(int num_threads) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> thread_pool,
ThreadPool::Make(num_threads));
int num_threads_running = 0;
std::mutex mutex;
std::condition_variable thread_gate;
std::condition_variable primer_gate;
for (int i = 0; i < num_threads; i++) {
// This shouldn't fail and, if it fails midway, we will have some threads
// still running if we do RETURN_NOT_OK so lets do ABORT_NOT_OK
ABORT_NOT_OK(thread_pool->Spawn([&] {
std::unique_lock<std::mutex> lk(mutex);
num_threads_running++;
primer_gate.notify_one();
thread_gate.wait(lk);
}));
}
std::unique_lock<std::mutex> primer_lock(mutex);
primer_gate.wait(primer_lock, [&] { return num_threads_running == num_threads; });
thread_gate.notify_all();
primer_lock.unlock();
thread_pool->WaitForIdle();
return thread_pool;
}

Status SlowTaskImpl(std::size_t, int64_t) {
SleepABit();
return Status::OK();
}
Status FastTaskImpl(std::size_t, int64_t) { return Status::OK(); }
// If this is the last task group then start the next stage
TaskScheduler::TaskGroupContinuationImpl MakeContinuation(
std::atomic<int>* counter, std::function<void(std::size_t, int)> start_next_stage,
int next_stage) {
return [counter, start_next_stage, next_stage](std::size_t thread_id) {
if (counter->fetch_sub(1) == 1) {
start_next_stage(thread_id, next_stage);
}
return Status::OK();
};
}
// Signal the cv if this is the last group
TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation(
std::atomic<int>* counter, std::mutex* mutex, std::condition_variable* finish) {
return [=](std::size_t thread_id) {
if (counter->fetch_sub(1) == 1) {
std::lock_guard<std::mutex> lg(*mutex);
finish->notify_one();
}
return Status::OK();
};
}

// This test simulates one of the current use patterns of the
// task scheduler. There are a number of groups. The groups
// are allocated to stages. All groups in a stage execute
// concurrently. When all groups in that stage finish the next
// stage is started.
TEST(TaskScheduler, Stress) {
constexpr int kNumThreads = 8;
constexpr int kNumGroups = 8;
constexpr int kGroupsPerStage = 3;
constexpr int kTasksPerGroup = 32;
constexpr int kNumStages = (kNumGroups % kGroupsPerStage == 0)
? (kNumGroups / kGroupsPerStage)
: (kNumGroups / kGroupsPerStage) + 1;
constexpr int kTrailingGroups = (kNumGroups % kGroupsPerStage == 0)
? kGroupsPerStage
: kNumGroups % kGroupsPerStage;

ThreadIndexer thread_indexer;
int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
ThreadPool::Make(num_threads));

std::array<std::atomic<int>, kNumStages - 1> stage_counters;
for (int i = 0; i < kNumStages - 1; i++) {
stage_counters[i].store(kGroupsPerStage);
}
std::atomic<int> final_counter(kTrailingGroups);
std::mutex mutex;
std::condition_variable finish_cv;

std::vector<int> group_ids;
auto scheduler = TaskScheduler::Make();

std::function<void(std::size_t, int)> start_next_stage = [&](std::size_t thread_id,
int stage_index) {
int start = stage_index * kGroupsPerStage;
int end = std::min(kNumGroups, start + kGroupsPerStage);
for (int i = start; i < end; i++) {
ASSERT_OK(thread_pool->Spawn([&, i] {
std::size_t my_thread_id = thread_indexer();
SleepABit();
ASSERT_OK(scheduler->StartTaskGroup(my_thread_id, group_ids[i], kTasksPerGroup));
}));
}
};

for (auto i = 0; i < kNumGroups; i++) {
int next_stage = (i / kGroupsPerStage) + 1;
TaskScheduler::TaskGroupContinuationImpl finish =
MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
if (next_stage < kNumStages) {
finish =
MakeContinuation(&stage_counters[next_stage - 1], start_next_stage, next_stage);
}
group_ids.push_back(scheduler->RegisterTaskGroup(SlowTaskImpl, finish));
}
scheduler->RegisterEnd();

TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
TaskScheduler::ScheduleImpl schedule =
[&](TaskScheduler::TaskGroupContinuationImpl task) {
return thread_pool->Spawn([&, task] {
std::size_t thread_id = thread_indexer();
ASSERT_OK(task(thread_id));
});
};
std::unique_lock<std::mutex> lock(mutex);
ASSERT_OK(thread_pool->Spawn([&] {
std::size_t thread_id = thread_indexer();
ASSERT_OK(scheduler->StartScheduling(thread_id, schedule, num_threads * 4, false));
start_next_stage(thread_id, 0);
}));

finish_cv.wait(lock);
thread_pool->WaitForIdle();
}

// This is a reproducer for a bug that was encountered when one
// thread starts a task group while another thread is finishing
// the last of its tasks.
TEST(TaskScheduler, StressTwo) {
constexpr int kNumThreads = 16;
constexpr int kNumGroups = 8;
constexpr int kTasksPerGroup = 1;
constexpr int kIterations = 1000;

ThreadIndexer thread_indexer;
int num_threads = std::min(static_cast<int>(thread_indexer.Capacity()), kNumThreads);
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ThreadPool> thread_pool,
MakePrimedThreadPool(num_threads));

for (int i = 0; i < kIterations; i++) {
std::atomic<int> final_counter(kNumGroups);
std::mutex mutex;
std::condition_variable finish_cv;

std::vector<int> group_ids;
auto scheduler = TaskScheduler::Make();

for (auto i = 0; i < kNumGroups; i++) {
TaskScheduler::TaskGroupContinuationImpl finish =
MakeFinalContinuation(&final_counter, &mutex, &finish_cv);
group_ids.push_back(scheduler->RegisterTaskGroup(FastTaskImpl, finish));
}
scheduler->RegisterEnd();

TaskScheduler::AbortContinuationImpl abort = [] { FAIL() << "Unexpected abort"; };
TaskScheduler::ScheduleImpl schedule =
[&](TaskScheduler::TaskGroupContinuationImpl task) {
return thread_pool->Spawn([&, task] {
std::size_t thread_id = thread_indexer();
ASSERT_OK(task(thread_id));
});
};

ASSERT_OK(scheduler->StartScheduling(0, schedule, num_threads * 4, false));
std::unique_lock<std::mutex> lock(mutex);
for (int i = 0; i < kNumGroups; i++) {
ASSERT_OK(thread_pool->Spawn([&, i] {
std::size_t thread_id = thread_indexer();
ASSERT_OK(scheduler->StartTaskGroup(thread_id, i, kTasksPerGroup));
}));
}

finish_cv.wait(lock);
thread_pool->WaitForIdle();
}
}

} // namespace compute
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class AtomicCounter {
std::atomic<bool> complete_{false};
};

class ThreadIndexer {
class ARROW_EXPORT ThreadIndexer {
public:
size_t operator()();

Expand Down