diff --git a/cpp/src/arrow/testing/executor_util.h b/cpp/src/arrow/testing/executor_util.h new file mode 100644 index 00000000000..e34fc858d07 --- /dev/null +++ b/cpp/src/arrow/testing/executor_util.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/thread_pool.h" + +namespace arrow { + +/// An executor which synchronously runs the task as part of the SpawnReal call. +class MockExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + spawn_count++; + std::move(task)(); + return Status::OK(); + } + + int spawn_count = 0; +}; + +/// An executor which does not actually run the task. Can be used to simulate situations +/// where the executor schedules a task in a long queue and doesn't get around to running +/// it for a while +class DelayedExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + captured_tasks.push_back(std::move(task)); + return Status::OK(); + } + + std::vector> captured_tasks; +}; + +} // namespace arrow diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index c7d7c37ad33..b329f99ed17 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -26,6 +26,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl { void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); } - void AddCallback(Callback callback) { + void CheckOptions(const CallbackOptions& opts) { + if (opts.should_schedule != ShouldSchedule::Never) { + DCHECK_NE(opts.executor, nullptr) + << "An executor must be specified when adding a callback that might schedule"; + } + } + + void AddCallback(Callback callback, CallbackOptions opts) { + CheckOptions(opts); std::unique_lock lock(mutex_); + CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); - std::move(callback)(); + RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/true); } else { - callbacks_.push_back(std::move(callback)); + callbacks_.push_back(std::move(callback_record)); } } - bool TryAddCallback(const std::function& callback_factory) { + bool TryAddCallback(const std::function& callback_factory, + CallbackOptions opts) { + CheckOptions(opts); std::unique_lock lock(mutex_); if (IsFutureFinished(state_)) { return false; } else { - callbacks_.push_back(callback_factory()); + callbacks_.push_back({callback_factory(), opts}); return true; } } + bool ShouldScheduleCallback(const CallbackRecord& callback_record, + bool in_add_callback) { + switch (callback_record.options.should_schedule) { + case ShouldSchedule::Never: + return false; + case ShouldSchedule::Always: + return true; + case ShouldSchedule::IfUnfinished: + return !in_add_callback; + default: + DCHECK(false) << "Unrecognized ShouldSchedule option"; + return false; + } + } + + void RunOrScheduleCallback(CallbackRecord&& callback_record, bool in_add_callback) { + if (ShouldScheduleCallback(callback_record, in_add_callback)) { + struct CallbackTask { + void operator()() { std::move(callback)(*self); } + + Callback callback; + std::shared_ptr self; + }; + // Need to keep `this` alive until the callback has a chance to be scheduled. + CallbackTask task{std::move(callback_record.callback), shared_from_this()}; + DCHECK_OK(callback_record.options.executor->Spawn(std::move(task))); + } else { + std::move(callback_record.callback)(*this); + } + } + void DoMarkFinishedOrFailed(FutureState state) { { // Lock the hypothetical waiter first, and the future after. @@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl { // // In fact, it is important not to hold the locks because the callback // may be slow or do its own locking on other resources - for (auto&& callback : callbacks_) { - std::move(callback)(); + for (auto& callback_record : callbacks_) { + RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/false); } callbacks_.clear(); } @@ -334,12 +377,13 @@ void FutureImpl::MarkFinished() { GetConcreteFuture(this)->DoMarkFinished(); } void FutureImpl::MarkFailed() { GetConcreteFuture(this)->DoMarkFailed(); } -void FutureImpl::AddCallback(Callback callback) { - GetConcreteFuture(this)->AddCallback(std::move(callback)); +void FutureImpl::AddCallback(Callback callback, CallbackOptions opts) { + GetConcreteFuture(this)->AddCallback(std::move(callback), opts); } -bool FutureImpl::TryAddCallback(const std::function& callback_factory) { - return GetConcreteFuture(this)->TryAddCallback(callback_factory); +bool FutureImpl::TryAddCallback(const std::function& callback_factory, + CallbackOptions opts) { + return GetConcreteFuture(this)->TryAddCallback(callback_factory, opts); } Future<> AllComplete(const std::vector>& futures) { diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 132443176ed..5fb17f95f2b 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE }; inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; } +/// \brief Describe whether the callback should be scheduled or run synchronously +enum class ShouldSchedule { + /// Always run the callback synchronously (the default) + Never = 0, + /// Schedule a new task only if the future is not finished when the + /// callback is added + IfUnfinished = 1, + /// Always schedule the callback as a new task + Always = 2 +}; + +/// \brief Options that control how a continuation is run +struct CallbackOptions { + /// Describe whether the callback should be run synchronously or scheduled + ShouldSchedule should_schedule = ShouldSchedule::Never; + /// If the callback is scheduled then this is the executor it should be scheduled + /// on. If this is NULL then should_schedule must be Never + internal::Executor* executor = NULL; + + static CallbackOptions Defaults() { return CallbackOptions(); } +}; + // Untyped private implementation -class ARROW_EXPORT FutureImpl { +class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this { public: FutureImpl(); virtual ~FutureImpl() = default; @@ -218,10 +240,15 @@ class ARROW_EXPORT FutureImpl { void MarkFailed(); void Wait(); bool Wait(double seconds); + template + Result* CastResult() const { + return static_cast*>(result_.get()); + } - using Callback = internal::FnOnce; - void AddCallback(Callback callback); - bool TryAddCallback(const std::function& callback_factory); + using Callback = internal::FnOnce; + void AddCallback(Callback callback, CallbackOptions opts); + bool TryAddCallback(const std::function& callback_factory, + CallbackOptions opts); // Waiter API inline FutureState SetWaiter(FutureWaiter* w, int future_num); @@ -234,7 +261,11 @@ class ARROW_EXPORT FutureImpl { using Storage = std::unique_ptr; Storage result_{NULLPTR, NULLPTR}; - std::vector callbacks_; + struct CallbackRecord { + Callback callback; + CallbackOptions options; + }; + std::vector callbacks_; }; // An object that waits on multiple futures at once. Only one waiter @@ -453,30 +484,34 @@ class Future { /// cyclic reference to itself through the callback. template typename std::enable_if::value>::type - AddCallback(OnComplete on_complete) const { + AddCallback(OnComplete on_complete, + CallbackOptions opts = CallbackOptions::Defaults()) const { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().result()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)(*impl.CastResult()); + } OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}); + impl_->AddCallback(Callback{std::move(on_complete)}, opts); } /// Overload for callbacks accepting a Status template typename std::enable_if::value>::type - AddCallback(OnComplete on_complete) const { + AddCallback(OnComplete on_complete, + CallbackOptions opts = CallbackOptions::Defaults()) const { static_assert(std::is_same::value, "Callbacks for Future<> should accept Status and not Result"); struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().status()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)(impl.CastResult()->status()); + } OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}); + impl_->AddCallback(Callback{std::move(on_complete)}, opts); } /// \brief Overload of AddCallback that will return false instead of running @@ -495,30 +530,33 @@ class Future { template > typename std::enable_if::value, bool>::type - TryAddCallback(const CallbackFactory& callback_factory) const { + TryAddCallback(const CallbackFactory& callback_factory, + CallbackOptions opts = CallbackOptions::Defaults()) const { struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().result()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)(*static_cast*>(impl.result_.get())); + } OnComplete on_complete; }; - return impl_->TryAddCallback([this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }); + return impl_->TryAddCallback( + [&callback_factory]() { return Callback{callback_factory()}; }, opts); } template > typename std::enable_if::value, bool>::type - TryAddCallback(const CallbackFactory& callback_factory) const { + TryAddCallback(const CallbackFactory& callback_factory, + CallbackOptions opts = CallbackOptions::Defaults()) const { struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().status()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)( + static_cast*>(impl.result_.get())->status()); + } OnComplete on_complete; }; - return impl_->TryAddCallback([this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }); + return impl_->TryAddCallback( + [&callback_factory]() { return Callback{callback_factory()}; }, opts); } /// \brief Consumer API: Register a continuation to run when this future completes @@ -696,9 +734,7 @@ class Future { void Initialize() { impl_ = FutureImpl::Make(); } - Result* GetResult() const { - return static_cast*>(impl_->result_.get()); - } + Result* GetResult() const { return impl_->CastResult(); } void SetResult(Result res) { impl_->result_ = {new Result(std::move(res)), diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 8c1e72a48bd..33796a05bb1 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -27,11 +27,13 @@ #include #include #include +#include #include #include #include +#include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/logging.h" @@ -387,10 +389,8 @@ TEST(FutureRefTest, TailRemoved) { TEST(FutureRefTest, HeadRemoved) { // Keeping the tail of the future chain should not keep the entire chain alive. If no - // one has a reference to the head then there is no need to keep it, nothing will finish - // it. In theory the intermediate futures could be finished by some external process - // but that would be highly unusual and bad practice so in reality this would just be a - // reference to a future that will never complete which is ok. + // one has a reference to the head then the future is abandoned. TODO (ARROW-12207): + // detect abandonment. std::weak_ptr ref; std::shared_ptr> ref2; { @@ -952,6 +952,117 @@ TEST(FutureCompletionTest, FutureVoid) { } } +class FutureSchedulingTest : public testing::Test { + public: + internal::Executor* executor() { return mock_executor.get(); } + + int spawn_count() { return static_cast(mock_executor->captured_tasks.size()); } + + void AssertRunSynchronously(const std::vector& ids) { AssertIds(ids, true); } + + void AssertScheduled(const std::vector& ids) { AssertIds(ids, false); } + + void AssertIds(const std::vector& ids, bool should_be_synchronous) { + for (auto id : ids) { + ASSERT_EQ(should_be_synchronous, callbacks_run_synchronously.find(id) != + callbacks_run_synchronously.end()); + } + } + + std::function callback(int id) { + return [this, id](const Status&) { callbacks_run_synchronously.insert(id); }; + } + + std::shared_ptr mock_executor = std::make_shared(); + std::unordered_set callbacks_run_synchronously; +}; + +TEST_F(FutureSchedulingTest, ScheduleNever) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::Never; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(1), options); + fut.MarkFinished(); + fut.AddCallback(callback(2), options); + ASSERT_EQ(0, spawn_count()); + AssertRunSynchronously({1, 2}); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(3), options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback(4), options); + ASSERT_EQ(0, spawn_count()); + AssertRunSynchronously({3, 4}); + } +} + +TEST_F(FutureSchedulingTest, ScheduleAlways) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::Always; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(1), options); + fut.MarkFinished(); + fut.AddCallback(callback(2), options); + ASSERT_EQ(2, spawn_count()); + AssertScheduled({1, 2}); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(3), options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback(4), options); + ASSERT_EQ(4, spawn_count()); + AssertScheduled({3, 4}); + } +} + +TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::IfUnfinished; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(1), options); + fut.MarkFinished(); + fut.AddCallback(callback(2), options); + ASSERT_EQ(1, spawn_count()); + AssertRunSynchronously({2}); + AssertScheduled({1}); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(3), options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback(4), options); + ASSERT_EQ(2, spawn_count()); + AssertRunSynchronously({4}); + AssertScheduled({3}); + } +} + +TEST_F(FutureSchedulingTest, ScheduleAlwaysKeepsFutureAliveUntilCallback) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::Always; + options.executor = executor(); + { + auto fut = Future::Make(); + fut.AddCallback([](const Result val) { ASSERT_EQ(7, *val); }, options); + fut.MarkFinished(7); + } + std::move(mock_executor->captured_tasks[0])(); +} + TEST(FutureAllTest, Empty) { auto combined = arrow::All(std::vector>{}); auto after_assert = combined.Then( diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index 8c304ffbbcf..511daed1eca 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include #include "arrow/testing/gtest_util.h" diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 8626132a348..d012aa02010 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -102,24 +102,24 @@ class ARROW_EXPORT Executor { // The continuations of that future should run on the CPU thread pool keeping // CPU heavy work off the I/O thread pool. So the I/O task should transfer // the future to the CPU executor before returning. - template , typename FTSync = typename FT::SyncType> + // + // By default this method will only transfer if the future is not already completed. If + // the future is already completed then any callback would be run synchronously and so + // no transfer is typically necessary. However, in cases where you want to force a + // transfer (e.g. to help the scheduler break up units of work across multiple cores) + // then you can override this behavior with `always_transfer`. + template Future Transfer(Future future) { - auto transferred = Future::Make(); - auto callback = [this, transferred](const FTSync& result) mutable { - auto spawn_status = - Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); - if (!spawn_status.ok()) { - transferred.MarkFinished(spawn_status); - } - }; - auto callback_factory = [&callback]() { return callback; }; - if (future.TryAddCallback(callback_factory)) { - return transferred; - } - // If the future is already finished and we aren't going to force spawn a thread - // then we don't need to add another layer of callback and can return the original - // future - return future; + return DoTransfer(std::move(future), false); + } + + // Overload of Transfer which will always schedule callbacks on new threads even if the + // future is finished when the callback is added. + // + // This can be useful in cases where you want to ensure parallelism + template + Future TransferAlways(Future future) { + return DoTransfer(std::move(future), true); } // Submit a callable and arguments for execution. Return a future that @@ -184,6 +184,39 @@ class ARROW_EXPORT Executor { Executor() = default; + template , typename FTSync = typename FT::SyncType> + Future DoTransfer(Future future, bool always_transfer = false) { + auto transferred = Future::Make(); + if (always_transfer) { + CallbackOptions callback_options = CallbackOptions::Defaults(); + callback_options.should_schedule = ShouldSchedule::Always; + callback_options.executor = this; + auto sync_callback = [transferred](const FTSync& result) mutable { + transferred.MarkFinished(result); + }; + future.AddCallback(sync_callback, callback_options); + return transferred; + } + + // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of + // work by doing the test here. + auto callback = [this, transferred](const FTSync& result) mutable { + auto spawn_status = + Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); + if (!spawn_status.ok()) { + transferred.MarkFinished(spawn_status); + } + }; + auto callback_factory = [&callback]() { return callback; }; + if (future.TryAddCallback(callback_factory)) { + return transferred; + } + // If the future is already finished and we aren't going to force spawn a thread + // then we don't need to add another layer of callback and can return the original + // future + return future; + } + // Subclassing API virtual Status SpawnReal(TaskHints hints, FnOnce task, StopToken, StopCallback&&) = 0; diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index bac6baf839f..2cfb4c62613 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -32,9 +32,12 @@ #include #include "arrow/status.h" +#include "arrow/testing/executor_util.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" #include "arrow/util/macros.h" +#include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -256,6 +259,42 @@ TEST_P(TestRunSynchronously, PropagatedError) { INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously, ::testing::Values(false, true)); +class TransferTest : public testing::Test { + public: + internal::Executor* executor() { return mock_executor.get(); } + int spawn_count() { return mock_executor->spawn_count; } + + std::function callback = [](const Status&) {}; + std::shared_ptr mock_executor = std::make_shared(); +}; + +TEST_F(TransferTest, DefaultTransferIfNotFinished) { + { + Future<> fut = Future<>::Make(); + auto transferred = executor()->Transfer(fut); + fut.MarkFinished(); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } + { + Future<> fut = Future<>::Make(); + fut.MarkFinished(); + auto transferred = executor()->Transfer(fut); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } +} + +TEST_F(TransferTest, TransferAlways) { + { + Future<> fut = Future<>::Make(); + fut.MarkFinished(); + auto transferred = executor()->TransferAlways(fut); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } +} + class TestThreadPool : public ::testing::Test { public: void TearDown() override {