From 1b8a8309bf23e4ed6461e9bed2e828e209317fd8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 24 May 2021 11:01:00 -1000 Subject: [PATCH 1/9] ARROW-12560: Added the ability to specify whether a Future callback should schedule a new thread task. Previously callbacks always ran synchronously. --- cpp/src/arrow/util/future.cc | 58 ++++++++++++++++++----- cpp/src/arrow/util/future.h | 65 ++++++++++++++++++++------ cpp/src/arrow/util/future_test.cc | 54 +++++++++++++++++++++ cpp/src/arrow/util/test_common.h | 15 ++++++ cpp/src/arrow/util/thread_pool.h | 21 ++++++++- cpp/src/arrow/util/thread_pool_test.cc | 38 +++++++++++++++ 6 files changed, 224 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index c7d7c37ad33..2fff6056607 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -26,6 +26,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -231,26 +232,60 @@ class ConcreteFutureImpl : public FutureImpl { void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); } - void AddCallback(Callback callback) { + void CheckOptions(const CallbackOptions& opts) { + if (opts.should_schedule != ShouldSchedule::NEVER) { + DCHECK_NE(opts.executor, NULL) + << "An executor must be specified when adding a callback that might schedule"; + } + } + + void AddCallback(Callback callback, CallbackOptions opts) { + CheckOptions(opts); std::unique_lock lock(mutex_); + CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); - std::move(callback)(); + RunOrScheduleCallback(callback_record, /*from_unfinished=*/false); } else { - callbacks_.push_back(std::move(callback)); + callbacks_.push_back(std::move(callback_record)); } } - bool TryAddCallback(const std::function& callback_factory) { + bool TryAddCallback(const std::function& callback_factory, + CallbackOptions opts) { + CheckOptions(opts); std::unique_lock lock(mutex_); if (IsFutureFinished(state_)) { return false; } else { - callbacks_.push_back(callback_factory()); + callbacks_.push_back({callback_factory(), opts}); return true; } } + bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) { + switch (callback_record.options.should_schedule) { + case ShouldSchedule::NEVER: + return false; + case ShouldSchedule::ALWAYS: + return true; + case ShouldSchedule::IF_UNFINISHED: + return from_unfinished; + default: + DCHECK(false) << "Unrecognized ShouldSchedule option"; + return false; + } + } + + void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) { + if (ShouldSchedule(callback_record, from_unfinished)) { + DCHECK_OK( + callback_record.options.executor->Spawn(std::move(callback_record.callback))); + } else { + std::move(callback_record.callback)(); + } + } + void DoMarkFinishedOrFailed(FutureState state) { { // Lock the hypothetical waiter first, and the future after. @@ -272,8 +307,8 @@ class ConcreteFutureImpl : public FutureImpl { // // In fact, it is important not to hold the locks because the callback // may be slow or do its own locking on other resources - for (auto&& callback : callbacks_) { - std::move(callback)(); + for (auto& callback_record : callbacks_) { + RunOrScheduleCallback(callback_record, /*from_unfinished=*/true); } callbacks_.clear(); } @@ -334,12 +369,13 @@ void FutureImpl::MarkFinished() { GetConcreteFuture(this)->DoMarkFinished(); } void FutureImpl::MarkFailed() { GetConcreteFuture(this)->DoMarkFailed(); } -void FutureImpl::AddCallback(Callback callback) { - GetConcreteFuture(this)->AddCallback(std::move(callback)); +void FutureImpl::AddCallback(Callback callback, CallbackOptions opts) { + GetConcreteFuture(this)->AddCallback(std::move(callback), opts); } -bool FutureImpl::TryAddCallback(const std::function& callback_factory) { - return GetConcreteFuture(this)->TryAddCallback(callback_factory); +bool FutureImpl::TryAddCallback(const std::function& callback_factory, + CallbackOptions opts) { + return GetConcreteFuture(this)->TryAddCallback(callback_factory, opts); } Future<> AllComplete(const std::vector>& futures) { diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 132443176ed..17aa8c41239 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -202,6 +202,28 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE }; inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; } +/// \brief Describes whether the callback should be scheduled or run synchronously +enum ShouldSchedule { + /// Always run the callback synchronously (the default) + NEVER = 0, + /// Schedule a new task only if the future is not finished when the + /// callback is added + IF_UNFINISHED = 1, + /// Always schedule the callback as a new task + ALWAYS = 2 +}; + +/// \brief Options that control how a continuation is run +struct CallbackOptions { + /// Describes whether the callback should be run synchronously or scheduled + ShouldSchedule should_schedule = ShouldSchedule::NEVER; + /// If the callback is scheduled then this is the executor it should be scheduled + /// on. If this is NULL then should_schedule must be NEVER + internal::Executor* executor = NULL; + + static CallbackOptions Defaults() { return CallbackOptions(); } +}; + // Untyped private implementation class ARROW_EXPORT FutureImpl { public: @@ -220,8 +242,9 @@ class ARROW_EXPORT FutureImpl { bool Wait(double seconds); using Callback = internal::FnOnce; - void AddCallback(Callback callback); - bool TryAddCallback(const std::function& callback_factory); + void AddCallback(Callback callback, CallbackOptions opts); + bool TryAddCallback(const std::function& callback_factory, + CallbackOptions opts); // Waiter API inline FutureState SetWaiter(FutureWaiter* w, int future_num); @@ -234,7 +257,11 @@ class ARROW_EXPORT FutureImpl { using Storage = std::unique_ptr; Storage result_{NULLPTR, NULLPTR}; - std::vector callbacks_; + struct CallbackRecord { + Callback callback; + CallbackOptions options; + }; + std::vector callbacks_; }; // An object that waits on multiple futures at once. Only one waiter @@ -453,7 +480,8 @@ class Future { /// cyclic reference to itself through the callback. template typename std::enable_if::value>::type - AddCallback(OnComplete on_complete) const { + AddCallback(OnComplete on_complete, + CallbackOptions opts = CallbackOptions::Defaults()) const { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here @@ -462,13 +490,14 @@ class Future { WeakFuture weak_self; OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}); + impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}, opts); } /// Overload for callbacks accepting a Status template typename std::enable_if::value>::type - AddCallback(OnComplete on_complete) const { + AddCallback(OnComplete on_complete, + CallbackOptions opts = CallbackOptions::Defaults()) const { static_assert(std::is_same::value, "Callbacks for Future<> should accept Status and not Result"); struct Callback { @@ -476,7 +505,7 @@ class Future { WeakFuture weak_self; OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}); + impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}, opts); } /// \brief Overload of AddCallback that will return false instead of running @@ -495,30 +524,36 @@ class Future { template > typename std::enable_if::value, bool>::type - TryAddCallback(const CallbackFactory& callback_factory) const { + TryAddCallback(const CallbackFactory& callback_factory, + CallbackOptions opts = CallbackOptions::Defaults()) const { struct Callback { void operator()() && { std::move(on_complete)(weak_self.get().result()); } WeakFuture weak_self; OnComplete on_complete; }; - return impl_->TryAddCallback([this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }); + return impl_->TryAddCallback( + [this, &callback_factory]() { + return Callback{WeakFuture(*this), callback_factory()}; + }, + opts); } template > typename std::enable_if::value, bool>::type - TryAddCallback(const CallbackFactory& callback_factory) const { + TryAddCallback(const CallbackFactory& callback_factory, + CallbackOptions opts = CallbackOptions::Defaults()) const { struct Callback { void operator()() && { std::move(on_complete)(weak_self.get().status()); } WeakFuture weak_self; OnComplete on_complete; }; - return impl_->TryAddCallback([this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }); + return impl_->TryAddCallback( + [this, &callback_factory]() { + return Callback{WeakFuture(*this), callback_factory()}; + }, + opts); } /// \brief Consumer API: Register a continuation to run when this future completes diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 8c1e72a48bd..0b55ce8dc93 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -35,6 +35,7 @@ #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/logging.h" +#include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -952,6 +953,59 @@ TEST(FutureCompletionTest, FutureVoid) { } } +class FutureSchedulingTest : public testing::Test { + public: + internal::Executor* executor() { return mock_executor.get(); } + int spawn_count() { return mock_executor->spawn_count; } + + std::function callback = [](const Status&) {}; + std::shared_ptr mock_executor = std::make_shared(); +}; + +TEST_F(FutureSchedulingTest, ScheduleAlways) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::ALWAYS; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback, options); + fut.MarkFinished(); + fut.AddCallback(callback, options); + ASSERT_EQ(2, spawn_count()); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback, options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback, options); + ASSERT_EQ(4, spawn_count()); + } +} + +TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::IF_UNFINISHED; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback, options); + fut.MarkFinished(); + fut.AddCallback(callback, options); + ASSERT_EQ(1, spawn_count()); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback, options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback, options); + ASSERT_EQ(2, spawn_count()); + } +} + TEST(FutureAllTest, Empty) { auto combined = arrow::All(std::vector>{}); auto after_assert = combined.Then( diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index 8c304ffbbcf..edf666dc2e1 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -19,6 +19,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/util/iterator.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -85,4 +86,18 @@ inline void AssertIteratorExhausted(Iterator& it) { Transformer MakeFilter(std::function filter); +class MockExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + spawn_count++; + std::move(task)(); + return Status::OK(); + } + + int spawn_count = 0; +}; + } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 8626132a348..0428f5a804d 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -102,9 +102,28 @@ class ARROW_EXPORT Executor { // The continuations of that future should run on the CPU thread pool keeping // CPU heavy work off the I/O thread pool. So the I/O task should transfer // the future to the CPU executor before returning. + // + // By default this method will only transfer if the future is not already completed. If + // the future is already completed then any callback would be run synchronously and so + // no transfer is typically necessary. However, in cases where you want to force a + // transfer (e.g. to help the scheduler break up units of work across multiple cores) + // then you can override this behavior with `always_transfer`. template , typename FTSync = typename FT::SyncType> - Future Transfer(Future future) { + Future Transfer(Future future, bool always_transfer = false) { auto transferred = Future::Make(); + if (always_transfer) { + CallbackOptions callback_options = CallbackOptions::Defaults(); + callback_options.should_schedule = ShouldSchedule::ALWAYS; + callback_options.executor = this; + auto sync_callback = [transferred](const FTSync& result) mutable { + transferred.MarkFinished(result); + }; + future.AddCallback(sync_callback, callback_options); + return transferred; + } + + // We could use AddCallback's ShouldSchedule::IF_UNFINISHED but we can save a bit of + // work by doing the test here. auto callback = [this, transferred](const FTSync& result) mutable { auto spawn_status = Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index bac6baf839f..aabab1108d3 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -32,9 +32,11 @@ #include #include "arrow/status.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" #include "arrow/util/macros.h" +#include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -256,6 +258,42 @@ TEST_P(TestRunSynchronously, PropagatedError) { INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously, ::testing::Values(false, true)); +class TransferTest : public testing::Test { + public: + internal::Executor* executor() { return mock_executor.get(); } + int spawn_count() { return mock_executor->spawn_count; } + + std::function callback = [](const Status&) {}; + std::shared_ptr mock_executor = std::make_shared(); +}; + +TEST_F(TransferTest, DefaultTransferIfNotFinished) { + { + Future<> fut = Future<>::Make(); + auto transferred = executor()->Transfer(fut); + fut.MarkFinished(); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } + { + Future<> fut = Future<>::Make(); + fut.MarkFinished(); + auto transferred = executor()->Transfer(fut); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } +} + +TEST_F(TransferTest, TransferAlways) { + { + Future<> fut = Future<>::Make(); + fut.MarkFinished(); + auto transferred = executor()->Transfer(fut, /*always_transfer=*/true); + ASSERT_FINISHES_OK(transferred); + ASSERT_EQ(1, spawn_count()); + } +} + class TestThreadPool : public ::testing::Test { public: void TearDown() override { From 30e1c6502c619c2a703a03ebf49c459549d60cff Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 24 May 2021 12:04:45 -1000 Subject: [PATCH 2/9] ARROW-12560: Callbacks scheduled using ScheduleAlways run after MarkFinished has completed so the previous 'rely on WeakFuture being valid' trick no longer worked --- cpp/src/arrow/util/future.cc | 14 ++++++++--- cpp/src/arrow/util/future.h | 40 +++++++++++++++---------------- cpp/src/arrow/util/future_test.cc | 32 +++++++++++++++++++++---- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 2fff6056607..63a3340a368 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -279,10 +279,18 @@ class ConcreteFutureImpl : public FutureImpl { void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) { if (ShouldSchedule(callback_record, from_unfinished)) { - DCHECK_OK( - callback_record.options.executor->Spawn(std::move(callback_record.callback))); + // Need to make a copy of this to keep it alive until the callback has a chance + // to be scheduled. + struct CallbackTask { + void operator()() { std::move(callback)(*self); } + + Callback callback; + std::shared_ptr self; + }; + CallbackTask task{std::move(callback_record.callback), shared_from_this()}; + DCHECK_OK(callback_record.options.executor->Spawn(std::move(task))); } else { - std::move(callback_record.callback)(); + std::move(callback_record.callback)(*this); } } diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 17aa8c41239..1c76481e404 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -225,7 +225,7 @@ struct CallbackOptions { }; // Untyped private implementation -class ARROW_EXPORT FutureImpl { +class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this { public: FutureImpl(); virtual ~FutureImpl() = default; @@ -241,7 +241,7 @@ class ARROW_EXPORT FutureImpl { void Wait(); bool Wait(double seconds); - using Callback = internal::FnOnce; + using Callback = internal::FnOnce; void AddCallback(Callback callback, CallbackOptions opts); bool TryAddCallback(const std::function& callback_factory, CallbackOptions opts); @@ -486,11 +486,12 @@ class Future { // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().result()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)(*static_cast*>(impl.result_.get())); + } OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}, opts); + impl_->AddCallback(Callback{std::move(on_complete)}, opts); } /// Overload for callbacks accepting a Status @@ -501,11 +502,13 @@ class Future { static_assert(std::is_same::value, "Callbacks for Future<> should accept Status and not Result"); struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().status()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)( + static_cast*>(impl.result_.get())->status()); + } OnComplete on_complete; }; - impl_->AddCallback(Callback{WeakFuture(*this), std::move(on_complete)}, opts); + impl_->AddCallback(Callback{std::move(on_complete)}, opts); } /// \brief Overload of AddCallback that will return false instead of running @@ -527,15 +530,13 @@ class Future { TryAddCallback(const CallbackFactory& callback_factory, CallbackOptions opts = CallbackOptions::Defaults()) const { struct Callback { - void operator()() && { std::move(on_complete)(weak_self.get().result()); } - WeakFuture weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)(*static_cast*>(impl.result_.get())); + } OnComplete on_complete; }; return impl_->TryAddCallback( - [this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }, - opts); + [this, &callback_factory]() { return Callback{callback_factory()}; }, opts); } template weak_self; + void operator()(const FutureImpl& impl) && { + std::move(on_complete)( + static_cast*>(impl.result_.get())->status()); + } OnComplete on_complete; }; return impl_->TryAddCallback( - [this, &callback_factory]() { - return Callback{WeakFuture(*this), callback_factory()}; - }, - opts); + [this, &callback_factory]() { return Callback{callback_factory()}; }, opts); } /// \brief Consumer API: Register a continuation to run when this future completes diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 0b55ce8dc93..0188149c8d9 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -388,10 +388,8 @@ TEST(FutureRefTest, TailRemoved) { TEST(FutureRefTest, HeadRemoved) { // Keeping the tail of the future chain should not keep the entire chain alive. If no - // one has a reference to the head then there is no need to keep it, nothing will finish - // it. In theory the intermediate futures could be finished by some external process - // but that would be highly unusual and bad practice so in reality this would just be a - // reference to a future that will never complete which is ok. + // one has a reference to the head then the future is abandoned. TODO (ARROW-12207): + // detect abandonment. std::weak_ptr ref; std::shared_ptr> ref2; { @@ -1006,6 +1004,32 @@ TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) { } } +class DelayedExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + captured_task = std::move(task); + return Status::OK(); + } + + internal::FnOnce captured_task; +}; + +TEST_F(FutureSchedulingTest, ScheduleAlwaysKeepsFutureAliveUntilCallback) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::ALWAYS; + DelayedExecutor delayed; + options.executor = &delayed; + { + auto fut = Future::Make(); + fut.AddCallback([](const Result val) { ASSERT_EQ(7, *val); }, options); + fut.MarkFinished(7); + } + std::move(delayed.captured_task)(); +} + TEST(FutureAllTest, Empty) { auto combined = arrow::All(std::vector>{}); auto after_assert = combined.Then( From db73b85e29c1e03913575a046bcf41e063b8b38a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 24 May 2021 12:13:09 -1000 Subject: [PATCH 3/9] ARROW-12560: Lint --- cpp/src/arrow/util/future.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 1c76481e404..185c7e35757 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -536,7 +536,7 @@ class Future { OnComplete on_complete; }; return impl_->TryAddCallback( - [this, &callback_factory]() { return Callback{callback_factory()}; }, opts); + [&callback_factory]() { return Callback{callback_factory()}; }, opts); } template TryAddCallback( - [this, &callback_factory]() { return Callback{callback_factory()}; }, opts); + [&callback_factory]() { return Callback{callback_factory()}; }, opts); } /// \brief Consumer API: Register a continuation to run when this future completes From 689d9585af033886efebd5b35ae5d1fe2dc96ed5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 24 May 2021 12:45:04 -1000 Subject: [PATCH 4/9] ARROW-12560: Build errors on Windows --- cpp/src/arrow/util/test_common.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index edf666dc2e1..3f6c0855bfd 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include #include "arrow/testing/gtest_util.h" From 4174fa5e9427f140014ba3323b7b572d7267a434 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 2 Jun 2021 03:23:03 -1000 Subject: [PATCH 5/9] ARROW-12560: WIP --- cpp/src/arrow/util/future.cc | 20 ++++++++++---------- cpp/src/arrow/util/future.h | 12 ++++++------ cpp/src/arrow/util/future_test.cc | 6 +++--- cpp/src/arrow/util/thread_pool.h | 4 ++-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 63a3340a368..4c2282f876d 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -233,8 +233,8 @@ class ConcreteFutureImpl : public FutureImpl { void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); } void CheckOptions(const CallbackOptions& opts) { - if (opts.should_schedule != ShouldSchedule::NEVER) { - DCHECK_NE(opts.executor, NULL) + if (opts.should_schedule != ShouldSchedule::Never) { + DCHECK_NE(opts.executor, nullptr) << "An executor must be specified when adding a callback that might schedule"; } } @@ -245,7 +245,7 @@ class ConcreteFutureImpl : public FutureImpl { CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); - RunOrScheduleCallback(callback_record, /*from_unfinished=*/false); + RunOrScheduleCallback(std::move(callback_record), /*from_unfinished=*/false); } else { callbacks_.push_back(std::move(callback_record)); } @@ -265,11 +265,11 @@ class ConcreteFutureImpl : public FutureImpl { bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) { switch (callback_record.options.should_schedule) { - case ShouldSchedule::NEVER: + case ShouldSchedule::Never: return false; - case ShouldSchedule::ALWAYS: + case ShouldSchedule::Always: return true; - case ShouldSchedule::IF_UNFINISHED: + case ShouldSchedule::IfUnfinished: return from_unfinished; default: DCHECK(false) << "Unrecognized ShouldSchedule option"; @@ -277,16 +277,16 @@ class ConcreteFutureImpl : public FutureImpl { } } - void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) { + void RunOrScheduleCallback(CallbackRecord&& callback_record, bool from_unfinished) { if (ShouldSchedule(callback_record, from_unfinished)) { - // Need to make a copy of this to keep it alive until the callback has a chance - // to be scheduled. struct CallbackTask { void operator()() { std::move(callback)(*self); } Callback callback; std::shared_ptr self; }; + // Need to make a copy of `this` via `shared_from_this` to keep it alive until the + // callback has a chance to be scheduled. CallbackTask task{std::move(callback_record.callback), shared_from_this()}; DCHECK_OK(callback_record.options.executor->Spawn(std::move(task))); } else { @@ -316,7 +316,7 @@ class ConcreteFutureImpl : public FutureImpl { // In fact, it is important not to hold the locks because the callback // may be slow or do its own locking on other resources for (auto& callback_record : callbacks_) { - RunOrScheduleCallback(callback_record, /*from_unfinished=*/true); + RunOrScheduleCallback(std::move(callback_record), /*from_unfinished=*/true); } callbacks_.clear(); } diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 185c7e35757..23eacd78e4a 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -202,23 +202,23 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE }; inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; } -/// \brief Describes whether the callback should be scheduled or run synchronously +/// \brief Describe whether the callback should be scheduled or run synchronously enum ShouldSchedule { /// Always run the callback synchronously (the default) - NEVER = 0, + Never = 0, /// Schedule a new task only if the future is not finished when the /// callback is added - IF_UNFINISHED = 1, + IfUnfinished = 1, /// Always schedule the callback as a new task - ALWAYS = 2 + Always = 2 }; /// \brief Options that control how a continuation is run struct CallbackOptions { /// Describes whether the callback should be run synchronously or scheduled - ShouldSchedule should_schedule = ShouldSchedule::NEVER; + ShouldSchedule should_schedule = ShouldSchedule::Never; /// If the callback is scheduled then this is the executor it should be scheduled - /// on. If this is NULL then should_schedule must be NEVER + /// on. If this is NULL then should_schedule must be Never internal::Executor* executor = NULL; static CallbackOptions Defaults() { return CallbackOptions(); } diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 0188149c8d9..2a87390243f 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -962,7 +962,7 @@ class FutureSchedulingTest : public testing::Test { TEST_F(FutureSchedulingTest, ScheduleAlways) { CallbackOptions options; - options.should_schedule = ShouldSchedule::ALWAYS; + options.should_schedule = ShouldSchedule::Always; options.executor = executor(); // Successful future { @@ -984,7 +984,7 @@ TEST_F(FutureSchedulingTest, ScheduleAlways) { TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) { CallbackOptions options; - options.should_schedule = ShouldSchedule::IF_UNFINISHED; + options.should_schedule = ShouldSchedule::IfUnfinished; options.executor = executor(); // Successful future { @@ -1019,7 +1019,7 @@ class DelayedExecutor : public internal::Executor { TEST_F(FutureSchedulingTest, ScheduleAlwaysKeepsFutureAliveUntilCallback) { CallbackOptions options; - options.should_schedule = ShouldSchedule::ALWAYS; + options.should_schedule = ShouldSchedule::Always; DelayedExecutor delayed; options.executor = &delayed; { diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 0428f5a804d..c6bcfb45a0b 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -113,7 +113,7 @@ class ARROW_EXPORT Executor { auto transferred = Future::Make(); if (always_transfer) { CallbackOptions callback_options = CallbackOptions::Defaults(); - callback_options.should_schedule = ShouldSchedule::ALWAYS; + callback_options.should_schedule = ShouldSchedule::Always; callback_options.executor = this; auto sync_callback = [transferred](const FTSync& result) mutable { transferred.MarkFinished(result); @@ -122,7 +122,7 @@ class ARROW_EXPORT Executor { return transferred; } - // We could use AddCallback's ShouldSchedule::IF_UNFINISHED but we can save a bit of + // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of // work by doing the test here. auto callback = [this, transferred](const FTSync& result) mutable { auto spawn_status = From 2ceb1a5169bf5473b94df6e657ef849291f9ff5c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 4 Jun 2021 08:43:13 -1000 Subject: [PATCH 6/9] ARROW-12560: Addressing PR comments --- cpp/src/arrow/testing/executor_util.h | 55 +++++++++++++++++ cpp/src/arrow/util/future.cc | 15 +++-- cpp/src/arrow/util/future.h | 15 ++--- cpp/src/arrow/util/future_test.cc | 84 +++++++++++++++++--------- cpp/src/arrow/util/test_common.h | 14 ----- cpp/src/arrow/util/thread_pool.h | 74 ++++++++++++++--------- cpp/src/arrow/util/thread_pool_test.cc | 3 +- 7 files changed, 173 insertions(+), 87 deletions(-) create mode 100644 cpp/src/arrow/testing/executor_util.h diff --git a/cpp/src/arrow/testing/executor_util.h b/cpp/src/arrow/testing/executor_util.h new file mode 100644 index 00000000000..e34fc858d07 --- /dev/null +++ b/cpp/src/arrow/testing/executor_util.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/thread_pool.h" + +namespace arrow { + +/// An executor which synchronously runs the task as part of the SpawnReal call. +class MockExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + spawn_count++; + std::move(task)(); + return Status::OK(); + } + + int spawn_count = 0; +}; + +/// An executor which does not actually run the task. Can be used to simulate situations +/// where the executor schedules a task in a long queue and doesn't get around to running +/// it for a while +class DelayedExecutor : public internal::Executor { + public: + int GetCapacity() override { return 0; } + + Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, + StopCallback&&) override { + captured_tasks.push_back(std::move(task)); + return Status::OK(); + } + + std::vector> captured_tasks; +}; + +} // namespace arrow diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 4c2282f876d..59fa4001d18 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -245,7 +245,7 @@ class ConcreteFutureImpl : public FutureImpl { CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); - RunOrScheduleCallback(std::move(callback_record), /*from_unfinished=*/false); + RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/true); } else { callbacks_.push_back(std::move(callback_record)); } @@ -263,30 +263,29 @@ class ConcreteFutureImpl : public FutureImpl { } } - bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) { + bool ShouldSchedule(const CallbackRecord& callback_record, bool in_add_callback) { switch (callback_record.options.should_schedule) { case ShouldSchedule::Never: return false; case ShouldSchedule::Always: return true; case ShouldSchedule::IfUnfinished: - return from_unfinished; + return !in_add_callback; default: DCHECK(false) << "Unrecognized ShouldSchedule option"; return false; } } - void RunOrScheduleCallback(CallbackRecord&& callback_record, bool from_unfinished) { - if (ShouldSchedule(callback_record, from_unfinished)) { + void RunOrScheduleCallback(CallbackRecord&& callback_record, bool in_add_callback) { + if (ShouldSchedule(callback_record, in_add_callback)) { struct CallbackTask { void operator()() { std::move(callback)(*self); } Callback callback; std::shared_ptr self; }; - // Need to make a copy of `this` via `shared_from_this` to keep it alive until the - // callback has a chance to be scheduled. + // Need to keep `this` alive until the callback has a chance to be scheduled. CallbackTask task{std::move(callback_record.callback), shared_from_this()}; DCHECK_OK(callback_record.options.executor->Spawn(std::move(task))); } else { @@ -316,7 +315,7 @@ class ConcreteFutureImpl : public FutureImpl { // In fact, it is important not to hold the locks because the callback // may be slow or do its own locking on other resources for (auto& callback_record : callbacks_) { - RunOrScheduleCallback(std::move(callback_record), /*from_unfinished=*/true); + RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/false); } callbacks_.clear(); } diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 23eacd78e4a..e54df82f173 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -215,7 +215,7 @@ enum ShouldSchedule { /// \brief Options that control how a continuation is run struct CallbackOptions { - /// Describes whether the callback should be run synchronously or scheduled + /// Describe whether the callback should be run synchronously or scheduled ShouldSchedule should_schedule = ShouldSchedule::Never; /// If the callback is scheduled then this is the executor it should be scheduled /// on. If this is NULL then should_schedule must be Never @@ -240,6 +240,10 @@ class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this void MarkFailed(); void Wait(); bool Wait(double seconds); + template + Result* CastResult() const { + return static_cast*>(result_.get()); + } using Callback = internal::FnOnce; void AddCallback(Callback callback, CallbackOptions opts); @@ -487,7 +491,7 @@ class Future { // weak reference to impl_ here struct Callback { void operator()(const FutureImpl& impl) && { - std::move(on_complete)(*static_cast*>(impl.result_.get())); + std::move(on_complete)(*impl.CastResult()); } OnComplete on_complete; }; @@ -503,8 +507,7 @@ class Future { "Callbacks for Future<> should accept Status and not Result"); struct Callback { void operator()(const FutureImpl& impl) && { - std::move(on_complete)( - static_cast*>(impl.result_.get())->status()); + std::move(on_complete)(impl.CastResult()->status()); } OnComplete on_complete; }; @@ -731,9 +734,7 @@ class Future { void Initialize() { impl_ = FutureImpl::Make(); } - Result* GetResult() const { - return static_cast*>(impl_->result_.get()); - } + Result* GetResult() const { return impl_->CastResult(); } void SetResult(Result res) { impl_->result_ = {new Result(std::move(res)), diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 2a87390243f..22cb397950e 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -27,11 +27,13 @@ #include #include #include +#include #include #include #include +#include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/logging.h" @@ -954,12 +956,48 @@ TEST(FutureCompletionTest, FutureVoid) { class FutureSchedulingTest : public testing::Test { public: internal::Executor* executor() { return mock_executor.get(); } - int spawn_count() { return mock_executor->spawn_count; } + int spawn_count() { return mock_executor->captured_tasks.size(); } + void AssertRunSynchronously(const std::vector& ids) { AssertIds(ids, true); } + void AssertScheduled(const std::vector& ids) { AssertIds(ids, false); } + void AssertIds(const std::vector& ids, bool should_be_synchronous) { + for (auto id : ids) { + ASSERT_EQ(should_be_synchronous, callbacks_run_synchronously.find(id) != + callbacks_run_synchronously.end()); + } + } + + std::function callback(int id) { + return [this, id](const Status&) { callbacks_run_synchronously.insert(id); }; + } - std::function callback = [](const Status&) {}; - std::shared_ptr mock_executor = std::make_shared(); + std::shared_ptr mock_executor = std::make_shared(); + std::unordered_set callbacks_run_synchronously; }; +TEST_F(FutureSchedulingTest, ScheduleNever) { + CallbackOptions options; + options.should_schedule = ShouldSchedule::Never; + options.executor = executor(); + // Successful future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(1), options); + fut.MarkFinished(); + fut.AddCallback(callback(2), options); + ASSERT_EQ(0, spawn_count()); + AssertRunSynchronously({1, 2}); + } + // Failing future + { + auto fut = Future<>::Make(); + fut.AddCallback(callback(3), options); + fut.MarkFinished(Status::Invalid("XYZ")); + fut.AddCallback(callback(4), options); + ASSERT_EQ(0, spawn_count()); + AssertRunSynchronously({3, 4}); + } +} + TEST_F(FutureSchedulingTest, ScheduleAlways) { CallbackOptions options; options.should_schedule = ShouldSchedule::Always; @@ -967,18 +1005,20 @@ TEST_F(FutureSchedulingTest, ScheduleAlways) { // Successful future { auto fut = Future<>::Make(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(1), options); fut.MarkFinished(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(2), options); ASSERT_EQ(2, spawn_count()); + AssertScheduled({1, 2}); } // Failing future { auto fut = Future<>::Make(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(3), options); fut.MarkFinished(Status::Invalid("XYZ")); - fut.AddCallback(callback, options); + fut.AddCallback(callback(4), options); ASSERT_EQ(4, spawn_count()); + AssertScheduled({3, 4}); } } @@ -989,45 +1029,35 @@ TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) { // Successful future { auto fut = Future<>::Make(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(1), options); fut.MarkFinished(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(2), options); ASSERT_EQ(1, spawn_count()); + AssertRunSynchronously({2}); + AssertScheduled({1}); } // Failing future { auto fut = Future<>::Make(); - fut.AddCallback(callback, options); + fut.AddCallback(callback(3), options); fut.MarkFinished(Status::Invalid("XYZ")); - fut.AddCallback(callback, options); + fut.AddCallback(callback(4), options); ASSERT_EQ(2, spawn_count()); + AssertRunSynchronously({4}); + AssertScheduled({3}); } } -class DelayedExecutor : public internal::Executor { - public: - int GetCapacity() override { return 0; } - - Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, - StopCallback&&) override { - captured_task = std::move(task); - return Status::OK(); - } - - internal::FnOnce captured_task; -}; - TEST_F(FutureSchedulingTest, ScheduleAlwaysKeepsFutureAliveUntilCallback) { CallbackOptions options; options.should_schedule = ShouldSchedule::Always; - DelayedExecutor delayed; - options.executor = &delayed; + options.executor = executor(); { auto fut = Future::Make(); fut.AddCallback([](const Result val) { ASSERT_EQ(7, *val); }, options); fut.MarkFinished(7); } - std::move(delayed.captured_task)(); + std::move(mock_executor->captured_tasks[0])(); } TEST(FutureAllTest, Empty) { diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index 3f6c0855bfd..57ca5686e27 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -88,18 +88,4 @@ inline void AssertIteratorExhausted(Iterator& it) { Transformer MakeFilter(std::function filter); -class MockExecutor : public internal::Executor { - public: - int GetCapacity() override { return 0; } - - Status SpawnReal(internal::TaskHints hints, internal::FnOnce task, StopToken, - StopCallback&&) override { - spawn_count++; - std::move(task)(); - return Status::OK(); - } - - int spawn_count = 0; -}; - } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index c6bcfb45a0b..d012aa02010 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -108,37 +108,18 @@ class ARROW_EXPORT Executor { // no transfer is typically necessary. However, in cases where you want to force a // transfer (e.g. to help the scheduler break up units of work across multiple cores) // then you can override this behavior with `always_transfer`. - template , typename FTSync = typename FT::SyncType> - Future Transfer(Future future, bool always_transfer = false) { - auto transferred = Future::Make(); - if (always_transfer) { - CallbackOptions callback_options = CallbackOptions::Defaults(); - callback_options.should_schedule = ShouldSchedule::Always; - callback_options.executor = this; - auto sync_callback = [transferred](const FTSync& result) mutable { - transferred.MarkFinished(result); - }; - future.AddCallback(sync_callback, callback_options); - return transferred; - } + template + Future Transfer(Future future) { + return DoTransfer(std::move(future), false); + } - // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of - // work by doing the test here. - auto callback = [this, transferred](const FTSync& result) mutable { - auto spawn_status = - Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); - if (!spawn_status.ok()) { - transferred.MarkFinished(spawn_status); - } - }; - auto callback_factory = [&callback]() { return callback; }; - if (future.TryAddCallback(callback_factory)) { - return transferred; - } - // If the future is already finished and we aren't going to force spawn a thread - // then we don't need to add another layer of callback and can return the original - // future - return future; + // Overload of Transfer which will always schedule callbacks on new threads even if the + // future is finished when the callback is added. + // + // This can be useful in cases where you want to ensure parallelism + template + Future TransferAlways(Future future) { + return DoTransfer(std::move(future), true); } // Submit a callable and arguments for execution. Return a future that @@ -203,6 +184,39 @@ class ARROW_EXPORT Executor { Executor() = default; + template , typename FTSync = typename FT::SyncType> + Future DoTransfer(Future future, bool always_transfer = false) { + auto transferred = Future::Make(); + if (always_transfer) { + CallbackOptions callback_options = CallbackOptions::Defaults(); + callback_options.should_schedule = ShouldSchedule::Always; + callback_options.executor = this; + auto sync_callback = [transferred](const FTSync& result) mutable { + transferred.MarkFinished(result); + }; + future.AddCallback(sync_callback, callback_options); + return transferred; + } + + // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of + // work by doing the test here. + auto callback = [this, transferred](const FTSync& result) mutable { + auto spawn_status = + Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); + if (!spawn_status.ok()) { + transferred.MarkFinished(spawn_status); + } + }; + auto callback_factory = [&callback]() { return callback; }; + if (future.TryAddCallback(callback_factory)) { + return transferred; + } + // If the future is already finished and we aren't going to force spawn a thread + // then we don't need to add another layer of callback and can return the original + // future + return future; + } + // Subclassing API virtual Status SpawnReal(TaskHints hints, FnOnce task, StopToken, StopCallback&&) = 0; diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index aabab1108d3..2cfb4c62613 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -32,6 +32,7 @@ #include #include "arrow/status.h" +#include "arrow/testing/executor_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" @@ -288,7 +289,7 @@ TEST_F(TransferTest, TransferAlways) { { Future<> fut = Future<>::Make(); fut.MarkFinished(); - auto transferred = executor()->Transfer(fut, /*always_transfer=*/true); + auto transferred = executor()->TransferAlways(fut); ASSERT_FINISHES_OK(transferred); ASSERT_EQ(1, spawn_count()); } From 7297aed2337507e40f0bf53774e2c8341cc3b1bf Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 4 Jun 2021 10:06:45 -1000 Subject: [PATCH 7/9] ARROW-12560: Moved ShouldSchedule to a scoped enum. Fix compiler warning. --- cpp/src/arrow/util/future.h | 2 +- cpp/src/arrow/util/future_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index e54df82f173..5fb17f95f2b 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -203,7 +203,7 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE }; inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; } /// \brief Describe whether the callback should be scheduled or run synchronously -enum ShouldSchedule { +enum class ShouldSchedule { /// Always run the callback synchronously (the default) Never = 0, /// Schedule a new task only if the future is not finished when the diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 22cb397950e..aa6dadd966d 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -956,7 +956,7 @@ TEST(FutureCompletionTest, FutureVoid) { class FutureSchedulingTest : public testing::Test { public: internal::Executor* executor() { return mock_executor.get(); } - int spawn_count() { return mock_executor->captured_tasks.size(); } + int spawn_count() { return static_cast(mock_executor->captured_tasks.size()); } void AssertRunSynchronously(const std::vector& ids) { AssertIds(ids, true); } void AssertScheduled(const std::vector& ids) { AssertIds(ids, false); } void AssertIds(const std::vector& ids, bool should_be_synchronous) { From 85418af1173d61e2213079cbea1bbb9e7ffa7508 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 4 Jun 2021 15:09:36 -1000 Subject: [PATCH 8/9] ARROW-12560: Mingw was getting confused and allowing a method name to shadow a typed enum --- cpp/src/arrow/util/future.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 59fa4001d18..b329f99ed17 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -263,7 +263,8 @@ class ConcreteFutureImpl : public FutureImpl { } } - bool ShouldSchedule(const CallbackRecord& callback_record, bool in_add_callback) { + bool ShouldScheduleCallback(const CallbackRecord& callback_record, + bool in_add_callback) { switch (callback_record.options.should_schedule) { case ShouldSchedule::Never: return false; @@ -278,7 +279,7 @@ class ConcreteFutureImpl : public FutureImpl { } void RunOrScheduleCallback(CallbackRecord&& callback_record, bool in_add_callback) { - if (ShouldSchedule(callback_record, in_add_callback)) { + if (ShouldScheduleCallback(callback_record, in_add_callback)) { struct CallbackTask { void operator()() { std::move(callback)(*self); } From 747c49873b23082d79aa10dc20b872e21f1c6d49 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 7 Jun 2021 15:30:15 +0200 Subject: [PATCH 9/9] Remove unused includes --- cpp/src/arrow/util/future_test.cc | 5 ++++- cpp/src/arrow/util/test_common.h | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index aa6dadd966d..33796a05bb1 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -37,7 +37,6 @@ #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/logging.h" -#include "arrow/util/test_common.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -956,9 +955,13 @@ TEST(FutureCompletionTest, FutureVoid) { class FutureSchedulingTest : public testing::Test { public: internal::Executor* executor() { return mock_executor.get(); } + int spawn_count() { return static_cast(mock_executor->captured_tasks.size()); } + void AssertRunSynchronously(const std::vector& ids) { AssertIds(ids, true); } + void AssertScheduled(const std::vector& ids) { AssertIds(ids, false); } + void AssertIds(const std::vector& ids, bool should_be_synchronous) { for (auto id : ids) { ASSERT_EQ(should_be_synchronous, callbacks_run_synchronously.find(id) != diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index 57ca5686e27..511daed1eca 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -21,7 +21,6 @@ #include "arrow/testing/gtest_util.h" #include "arrow/util/iterator.h" -#include "arrow/util/thread_pool.h" namespace arrow {