diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 308ee49972c..3343ba2adf5 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -190,6 +190,7 @@ set(ARROW_SRCS io/slow.cc io/stdio.cc io/transform.cc + util/async_nursery.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 660fb2657b6..6b3bcff8aa5 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -42,6 +42,7 @@ add_arrow_test(utility-test SOURCES align_util_test.cc async_generator_test.cc + async_nursery_test.cc bit_block_counter_test.cc bit_util_test.cc cache_test.cc diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc new file mode 100644 index 00000000000..def884a1f50 --- /dev/null +++ b/cpp/src/arrow/util/async_nursery.cc @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_nursery.h" + +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { + +AsyncCloseable::AsyncCloseable() : on_closed_(Future<>::Make()) {} +AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) : on_closed_(Future<>::Make()) { + parent->AddDependentTask(OnClosed()); +} + +AsyncCloseable::~AsyncCloseable() { + DCHECK_NE(nursery_, nullptr) << "An AsyncCloseable must be created with a nursery " + "using MakeSharedCloseable or MakeUniqueCloseable"; +} + +const Future<>& AsyncCloseable::OnClosed() { return on_closed_; } + +void AsyncCloseable::AddDependentTask(const Future<>& task) { + DCHECK(!closed_); + if (num_tasks_outstanding_.fetch_add(1) == 1) { + tasks_finished_ = Future<>::Make(); + } + task.AddCallback([this](const Status& st) { + if (num_tasks_outstanding_.fetch_sub(1) == 1 && closed_.load()) { + tasks_finished_.MarkFinished(st); + } + }); +} + +void AsyncCloseable::SetNursery(Nursery* nursery) { nursery_ = nursery; } + +void AsyncCloseable::Destroy() { + DCHECK_NE(nursery_, nullptr); + closed_ = true; + nursery_->num_closeables_destroyed_.fetch_add(1); + Future<> finish_fut; + if (tasks_finished_.is_valid()) { + if (num_tasks_outstanding_.fetch_sub(1) > 1) { + finish_fut = AllComplete({DoClose(), tasks_finished_}); + } else { + // Any added tasks have already finished so there is nothing to wait for + finish_fut = DoClose(); + } + } else { + // No dependent tasks were added + finish_fut = DoClose(); + } + finish_fut.AddCallback([this](const Status& st) { + if (on_closed_.is_valid()) { + on_closed_.MarkFinished(st); + } + nursery_->OnTaskFinished(st); + delete this; + }); +} + +Status AsyncCloseable::CheckClosed() const { + if (closed_.load()) { + return Status::Invalid("Invalid operation after Close"); + } + return Status::OK(); +} + +void AsyncCloseablePimpl::Init(AsyncCloseable* impl) { impl_ = impl; } +void AsyncCloseablePimpl::Destroy() { impl_->Destroy(); } +void AsyncCloseablePimpl::SetNursery(Nursery* nursery) { impl_->SetNursery(nursery); } + +Nursery::Nursery() : finished_(Future<>::Make()) {} + +Status Nursery::WaitForFinish() { + if (num_closeables_destroyed_.load() != num_closeables_created_.load()) { + return Status::UnknownError( + "Not all closeables that were created during the nursery were destroyed. " + "Something must be holding onto a shared_ptr/unique_ptr reference."); + } + if (num_tasks_outstanding_.fetch_sub(1) == 1) { + // All tasks done, nothing to wait for + return Status::OK(); + } + return finished_.status(); +} + +void Nursery::OnTaskFinished(Status st) { + if (num_tasks_outstanding_.fetch_sub(1) == 1) { + finished_.MarkFinished(std::move(st)); + } +} + +Status Nursery::RunInNursery(std::function task) { + Nursery nursery; + task(&nursery); + return nursery.WaitForFinish(); +} + +Status Nursery::RunInNursery(std::function task) { + Nursery nursery; + Status task_st = task(&nursery); + // Need to wait for everything to finish, even if invalid status + Status close_st = nursery.WaitForFinish(); + return task_st & close_st; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h new file mode 100644 index 00000000000..9ae1f9aa548 --- /dev/null +++ b/cpp/src/arrow/util/async_nursery.h @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/future.h" +#include "arrow/util/mutex.h" + +namespace arrow { +namespace util { + +class Nursery; +class AsyncCloseablePimpl; + +template +struct DestroyingDeleter { + void operator()(T* p) { p->Destroy(); } +}; + +/// An object which should be asynchronously closed before it is destroyed +/// +/// Any AsyncCloseable must be kept alive until its parent is destroyed (this is a given +/// if the parent is a nursery). For shorter lived tasks/objects consider +/// OwnedAsyncCloseable adding a dependent task. +class ARROW_EXPORT AsyncCloseable : public std::enable_shared_from_this { + public: + AsyncCloseable(); + explicit AsyncCloseable(AsyncCloseable* parent); + virtual ~AsyncCloseable(); + + /// Returns a future that is completed when this object is finished closing + const Future<>& OnClosed(); + + protected: + /// Subclasses should override this and perform any cleanup. Once the future returned + /// by this method finishes then this object is eligible for destruction and any + /// reference to `this` will be invalid + virtual Future<> DoClose() = 0; + + /// This method is called by subclasses to add tasks which must complete before the + /// object can be safely deleted + void AddDependentTask(const Future<>& task); + /// This method can be called by subclasses for error checking purposes. It will + /// return an invalid status if this object has started closing + Status CheckClosed() const; + + Nursery* nursery_; + + private: + void SetNursery(Nursery* nursery); + void Destroy(); + + Future<> on_closed_; + Future<> tasks_finished_; + std::atomic closed_{false}; + std::atomic num_tasks_outstanding_{1}; + + friend Nursery; + template + friend struct DestroyingDeleter; + friend AsyncCloseablePimpl; +}; + +class ARROW_EXPORT AsyncCloseablePimpl { + protected: + void Init(AsyncCloseable* impl); + + private: + void SetNursery(Nursery* nursery); + void Destroy(); + + AsyncCloseable* impl_; + + friend Nursery; + template + friend struct DestroyingDeleter; +}; + +class ARROW_EXPORT Nursery { + public: + template + typename std::enable_if::value, std::shared_ptr>::type + MakeSharedCloseable(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeSharedCloseable only works with AsyncCloseable types"); + num_closeables_created_.fetch_add(1); + num_tasks_outstanding_.fetch_add(1); + std::shared_ptr shared_closeable(new T(std::forward(args)...), + DestroyingDeleter()); + shared_closeable->SetNursery(this); + return shared_closeable; + } + + template + typename std::enable_if::value, + std::unique_ptr>>::type + MakeUniqueCloseable(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeUniqueCloseable only works with AsyncCloseable types"); + num_closeables_created_.fetch_add(1); + num_tasks_outstanding_.fetch_add(1); + auto unique_closeable = std::unique_ptr>( + new T(std::forward(args)...), DestroyingDeleter()); + unique_closeable->SetNursery(this); + return unique_closeable; + } + + template + void AddDependentTask(const Future& task) { + num_tasks_outstanding_.fetch_add(1); + task.AddCallback([this](const Result& res) { OnTaskFinished(res.status()); }); + } + + /// Runs `task` within a nursery. This method will not return until + /// all roots added by the task have been closed and destroyed. + static Status RunInNursery(std::function task); + static Status RunInNursery(std::function task); + + protected: + Status WaitForFinish(); + void OnTaskFinished(Status st); + + Nursery(); + + // Rather than keep a separate closing flag (and requiring mutex) we treat + // "closing" as a default task + std::atomic num_tasks_outstanding_{1}; + std::atomic num_closeables_created_{0}; + std::atomic num_closeables_destroyed_{0}; + Future<> finished_; + + friend AsyncCloseable; +}; + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_nursery_test.cc b/cpp/src/arrow/util/async_nursery_test.cc new file mode 100644 index 00000000000..f3faf20ff93 --- /dev/null +++ b/cpp/src/arrow/util/async_nursery_test.cc @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_nursery.h" + +#include + +#include + +#include "arrow/result.h" +#include "arrow/testing/gtest_common.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace util { + +class GatingDoClose : public AsyncCloseable { + public: + GatingDoClose(AsyncCloseable* parent, Future<> close_future) + : AsyncCloseable(parent), close_future_(std::move(close_future)) {} + explicit GatingDoClose(Future<> close_future) + : close_future_(std::move(close_future)) {} + + Future<> DoClose() override { return close_future_; } + + Future<> close_future_; +}; + +class GatingDoCloseWithUniqueChild : public AsyncCloseable { + public: + GatingDoCloseWithUniqueChild(Nursery* nursery, Future<> close_future) + : child_( + nursery->MakeUniqueCloseable(this, std::move(close_future))) {} + + Future<> DoClose() override { + child_.reset(); + return Future<>::MakeFinished(); + } + + std::unique_ptr> child_; +}; + +class GatingDoCloseWithSharedChild : public AsyncCloseable { + public: + GatingDoCloseWithSharedChild(Nursery* nursery, Future<> close_future) + : child_( + nursery->MakeSharedCloseable(this, std::move(close_future))) {} + + Future<> DoClose() override { + child_.reset(); + return Future<>::MakeFinished(); + } + + std::shared_ptr child_; +}; + +class GatingDoCloseAsDependentTask : public AsyncCloseable { + public: + explicit GatingDoCloseAsDependentTask(Future<> close_future) { + AddDependentTask(std::move(close_future)); + } + + Future<> DoClose() override { return Future<>::MakeFinished(); } +}; + +class MarkWhenDestroyed : public GatingDoClose { + public: + MarkWhenDestroyed(Future<> close_future, bool* destroyed) + : GatingDoClose(std::move(close_future)), destroyed_(destroyed) {} + ~MarkWhenDestroyed() { *destroyed_ = true; } + + private: + bool* destroyed_; +}; + +class EvictsChild : public AsyncCloseable { + public: + EvictsChild(Nursery* nursery, bool* child_destroyed, Future<> child_future, + Future<> final_future) + : final_close_future_(std::move(final_future)) { + owned_child_ = nursery->MakeSharedCloseable( + std::move(child_future), child_destroyed); + } + + void EvictChild() { owned_child_.reset(); } + + Future<> DoClose() override { return final_close_future_; } + + private: + Future<> final_close_future_; + std::shared_ptr owned_child_; +}; + +template +void AssertDoesNotCloseEarly() { + Future<> gate = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery( + [&](Nursery* nursery) { nursery->MakeSharedCloseable(gate); })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + gate.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +} + +template +void AssertDoesNotCloseEarlyWithChild() { + Future<> gate = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery( + [&](Nursery* nursery) { nursery->MakeSharedCloseable(nursery, gate); })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + gate.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +} + +TEST(AsyncNursery, DoClose) { AssertDoesNotCloseEarly(); } + +TEST(AsyncNursery, SharedChildDoClose) { + AssertDoesNotCloseEarlyWithChild(); +} + +TEST(AsyncNursery, UniqueChildDoClose) { + AssertDoesNotCloseEarlyWithChild(); +} + +TEST(AsyncNursery, DependentTask) { + AssertDoesNotCloseEarly(); +} + +TEST(AsyncNursery, EvictedChild) { + Future<> child_future = Future<>::Make(); + Future<> final_future = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery([&](Nursery* nursery) { + bool child_destroyed = false; + std::shared_ptr evicts_child = + nursery->MakeSharedCloseable(nursery, &child_destroyed, + child_future, final_future); + evicts_child->EvictChild(); + // Owner no longer has reference to child here but it's kept alive by nursery + // because it isn't done + ASSERT_FALSE(child_destroyed); + child_future.MarkFinished(); + ASSERT_TRUE(child_destroyed); + })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + final_future.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index fc8022a95e4..c67e9b34969 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -312,11 +312,13 @@ class ConcreteFutureImpl : public FutureImpl { waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state); } } - cv_.notify_all(); - auto callbacks = std::move(callbacks_); auto self = shared_from_this(); + // Don't notify until we've saved off all state. After notifying it is common for + // this to be deleted and we shouldn't access any local state. + cv_.notify_all(); + // run callbacks, lock not needed since the future is finished by this // point so nothing else can modify the callbacks list and it is safe // to iterate.