From d90837c05728c4d968552563e8db285b2898199a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 19 Aug 2021 17:48:46 -1000 Subject: [PATCH 01/11] Initial sketch of asynchronous nursery --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/util/CMakeLists.txt | 93 +++++++------- cpp/src/arrow/util/async_nursery.cc | 127 +++++++++++++++++++ cpp/src/arrow/util/async_nursery.h | 144 +++++++++++++++++++++ cpp/src/arrow/util/async_nursery_test.cc | 151 +++++++++++++++++++++++ 5 files changed, 470 insertions(+), 46 deletions(-) create mode 100644 cpp/src/arrow/util/async_nursery.cc create mode 100644 cpp/src/arrow/util/async_nursery.h create mode 100644 cpp/src/arrow/util/async_nursery_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 308ee49972c..3343ba2adf5 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -190,6 +190,7 @@ set(ARROW_SRCS io/slow.cc io/stdio.cc io/transform.cc + util/async_nursery.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 660fb2657b6..194e2e5d8e1 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -26,56 +26,57 @@ arrow_install_all_headers("arrow/util") # arrow_test_main # -if(WIN32) - # This manifest enables long file paths on Windows 10+ - # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#enable-long-paths-in-windows-10-version-1607-and-later - if(MSVC) - set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.manifest) - else() - set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.rc) - endif() -else() - set(IO_UTIL_TEST_SOURCES io_util_test.cc) -endif() +if (WIN32) + # This manifest enables long file paths on Windows 10+ + # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#enable-long-paths-in-windows-10-version-1607-and-later + if (MSVC) + set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.manifest) + else () + set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.rc) + endif () +else () + set(IO_UTIL_TEST_SOURCES io_util_test.cc) +endif () add_arrow_test(utility-test - SOURCES - align_util_test.cc - async_generator_test.cc - bit_block_counter_test.cc - bit_util_test.cc - cache_test.cc - checked_cast_test.cc - compression_test.cc - decimal_test.cc - formatting_util_test.cc - key_value_metadata_test.cc - hashing_test.cc - int_util_test.cc - ${IO_UTIL_TEST_SOURCES} - iterator_test.cc - logging_test.cc - queue_test.cc - range_test.cc - reflection_test.cc - rle_encoding_test.cc - stl_util_test.cc - string_test.cc - tdigest_test.cc - test_common.cc - time_test.cc - trie_test.cc - uri_test.cc - utf8_util_test.cc - value_parsing_test.cc - variant_test.cc) + SOURCES + align_util_test.cc + async_generator_test.cc + async_nursery_test.cc + bit_block_counter_test.cc + bit_util_test.cc + cache_test.cc + checked_cast_test.cc + compression_test.cc + decimal_test.cc + formatting_util_test.cc + key_value_metadata_test.cc + hashing_test.cc + int_util_test.cc + ${IO_UTIL_TEST_SOURCES} + iterator_test.cc + logging_test.cc + queue_test.cc + range_test.cc + reflection_test.cc + rle_encoding_test.cc + stl_util_test.cc + string_test.cc + tdigest_test.cc + test_common.cc + time_test.cc + trie_test.cc + uri_test.cc + utf8_util_test.cc + value_parsing_test.cc + variant_test.cc) add_arrow_test(threading-utility-test - SOURCES - cancel_test.cc - future_test.cc - task_group_test.cc - thread_pool_test.cc) + SOURCES + cancel_test.cc + future_test.cc + task_group_test.cc + thread_pool_test.cc) add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc new file mode 100644 index 00000000000..2147cec3ad8 --- /dev/null +++ b/cpp/src/arrow/util/async_nursery.cc @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_nursery.h" + +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { + +AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) : on_closed_(Future<>::Make()) { + if (parent) { + Mutex::Guard guard = mutex_.Lock(); + parent->children_.push_back(this); + self_itr_ = --parent->children_.end(); + } +} + +AsyncCloseable::~AsyncCloseable() { DCHECK(close_complete_.load()); } + +Future<> AsyncCloseable::OnClosed() { return on_closed_; } + +Future<> AsyncCloseable::Close() { + { + Mutex::Guard guard = mutex_.Lock(); + if (closed_.load()) { + return Future<>::MakeFinished(); + } + closed_.store(true); + } + return DoClose() + .Then([this] { + close_complete_.store(true); + on_closed_.MarkFinished(); + return CloseChildren(); + }) + .Then([] {}, + [this](const Status& err) { + close_complete_.store(true); + on_closed_.MarkFinished(err); + return err; + }); +} + +Future<> AsyncCloseable::CloseChildren() { + for (auto& child : children_) { + tasks_.push_back(child->Close()); + } + return AllComplete(tasks_); +} + +Status AsyncCloseable::CheckClosed() const { + if (closed_.load()) { + return Status::Invalid("Invalid operation after Close"); + } + return Status::OK(); +} + +void AsyncCloseable::AssertNotCloseComplete() const { DCHECK(!close_complete_); } + +void AsyncCloseable::AddDependentTask(Future<> task) { + tasks_.push_back(std::move(task)); +} + +OwnedAsyncCloseable::OwnedAsyncCloseable(AsyncCloseable* parent) + : AsyncCloseable(parent) { + parent_ = parent; +} + +void OwnedAsyncCloseable::Init() { + Mutex::Guard lock = parent_->mutex_.Lock(); + parent_->owned_children_.push_back(shared_from_this()); + owned_self_itr_ = --parent_->owned_children_.end(); +} + +void OwnedAsyncCloseable::Evict() { + { + Mutex::Guard lock = parent_->mutex_.Lock(); + if (parent_->closed_) { + // Parent is already closing, no need to do anything, the parent will call close on + // this instance eventually + return; + } + parent_->children_.erase(self_itr_); + } + // We need to add a dependent task to make sure our parent does not close itself + // while we are shutting down. + parent_->AddDependentTask(Close().Then([this] { + Mutex::Guard lock = parent_->mutex_.Lock(); + parent_->owned_children_.erase(owned_self_itr_); + })); +} + +NurseryPimpl::~NurseryPimpl() = default; + +Nursery::Nursery() : AsyncCloseable(nullptr) {} +Future<> Nursery::DoClose() { return Future<>::MakeFinished(); } + +Status Nursery::RunInNursery(std::function task) { + Nursery nursery; + task(&nursery); + return nursery.Close().status(); +} + +Status Nursery::RunInNurserySt(std::function task) { + Nursery nursery; + Status task_st = task(&nursery); + Status close_st = nursery.Close().status(); + return task_st & close_st; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h new file mode 100644 index 00000000000..51355f7e1ee --- /dev/null +++ b/cpp/src/arrow/util/async_nursery.h @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_ASYNC_NURSERY_H +#define ARROW_ASYNC_NURSERY_H + +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/future.h" +#include "arrow/util/mutex.h" + +namespace arrow { +namespace util { + +class OwnedAsyncCloseable; + +/// An object which should be asynchronously closed before it is destroyed +/// +/// Any AsyncCloseable must be kept alive until its parent is destroyed (this is a given +/// if the parent is a nursery). For shorter lived tasks/objects consider +/// OwnedAsyncCloseable adding a dependent task. +class AsyncCloseable { + public: + /// \brief Construct an AsyncCloseable as a child of `parent` + explicit AsyncCloseable(AsyncCloseable* parent); + virtual ~AsyncCloseable(); + + /// Returns a future that is completed when this object is finished closing + Future<> OnClosed(); + + protected: + /// Closes the AsyncCloseable + /// This will first call DoClose and then simultaneously close all children and + /// tasks_ + virtual Future<> Close(); + /// Subclasses should override this and perform any cleanup that is not captured by + /// tasks_. Once the future returned by this method finishes then this object is + /// eligible for destruction and any reference to `this` may be invalid + virtual Future<> DoClose() = 0; + + /// This method is called by subclasses to add tasks which must complete before the + /// object can be safely deleted + void AddDependentTask(Future<> task); + /// This method can be called by subclasses for error checking purposes. It will + /// return an invalid status if this object has started closing + Status CheckClosed() const; + /// This can be used for sanity checking that a callback is not run after close has + /// been finished. It will assert if the object has been fully closed (and `this` + /// references are unsafe) + void AssertNotCloseComplete() const; + + private: + Future<> CloseChildren(); + + std::list children_; + std::list::iterator self_itr_; + std::list> owned_children_; + std::vector> tasks_; + Future<> on_closed_; + std::atomic closed_{false}; + std::atomic close_complete_{false}; + util::Mutex mutex_; + + friend OwnedAsyncCloseable; +}; + +/// An override of AsyncCloseable which is eligible for eviction. Instances must be +/// created as shared pointers as this utilizes enable_shared_from_this. +class OwnedAsyncCloseable : public AsyncCloseable, + public std::enable_shared_from_this { + public: + explicit OwnedAsyncCloseable(AsyncCloseable* parent); + + void Init(); + /// \brief Marks this instance as eligible for removal. This will start the close + /// process and, when it is finished, the instance will be deleted. + void Evict(); + + protected: + AsyncCloseable* parent_; + std::list>::iterator owned_self_itr_; +}; + +/// Helper base class which allows classes that implement the pimpl pattern to +/// participate in a nursery. This is only needed if the type is going to be +/// used as a nursery root. +class NurseryPimpl { + public: + virtual ~NurseryPimpl(); + /// Transfers ownership of the impl to the nursery + virtual std::shared_ptr TransferOwnership() = 0; +}; + +class Nursery : private AsyncCloseable { + public: + template + std::shared_ptr AddRoot(std::function(AsyncCloseable*)> factory) { + std::shared_ptr root = factory(this); + roots_.push_back(std::static_pointer_cast(root)); + return root; + } + + template + std::shared_ptr AddPimplRoot( + std::function(AsyncCloseable*)> factory) { + std::shared_ptr pimpl = factory(this); + roots_.push_back(pimpl->TransferOwnership()); + return pimpl; + } + + /// Runs `task` within a nursery. This method will not return until + /// all roots added by the task have been closed and destroyed. + static Status RunInNursery(std::function task); + // FIXME template hackery to callable that can return void/Status/Result + static Status RunInNurserySt(std::function task); + + protected: + Future<> DoClose() override; + + Nursery(); + + std::vector> roots_; +}; + +} // namespace util +} // namespace arrow + +#endif // ARROW_ASYNC_NURSERY_H diff --git a/cpp/src/arrow/util/async_nursery_test.cc b/cpp/src/arrow/util/async_nursery_test.cc new file mode 100644 index 00000000000..358c20dbd18 --- /dev/null +++ b/cpp/src/arrow/util/async_nursery_test.cc @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_nursery.h" + +#include + +#include + +#include "arrow/result.h" +#include "arrow/testing/gtest_common.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace util { + +class GatingDoClose : public AsyncCloseable { + public: + GatingDoClose(AsyncCloseable* parent, Future<> close_future) + : AsyncCloseable(parent), close_future_(std::move(close_future)) {} + + Future<> DoClose() override { return close_future_; } + + Future<> close_future_; +}; + +class GatingDoCloseWithChild : public AsyncCloseable { + public: + GatingDoCloseWithChild(AsyncCloseable* parent, Future<> close_future) + : AsyncCloseable(parent), child_(this, std::move(close_future)) {} + + Future<> DoClose() override { return Future<>::MakeFinished(); } + + GatingDoClose child_; +}; + +class GatingDoCloseAsDependentTask : public AsyncCloseable { + public: + GatingDoCloseAsDependentTask(AsyncCloseable* parent, Future<> close_future) + : AsyncCloseable(parent) { + AddDependentTask(std::move(close_future)); + } + + Future<> DoClose() override { return Future<>::MakeFinished(); } +}; + +class EvictableChild : public OwnedAsyncCloseable { + public: + EvictableChild(AsyncCloseable* parent, Future<> close_future) + : OwnedAsyncCloseable(parent), close_future_(std::move(close_future)) {} + + Future<> DoClose() override { return close_future_; } + + private: + Future<> close_future_; +}; + +class EvictsChild : public AsyncCloseable { + public: + EvictsChild(AsyncCloseable* parent, Future<> child_future, Future<> final_future) + : AsyncCloseable(parent), final_close_future_(std::move(final_future)) { + owned_child_ = std::make_shared(this, std::move(child_future)); + owned_child_->Init(); + child_ = owned_child_; + } + + void EvictChild() { + owned_child_->Evict(); + owned_child_.reset(); + } + + bool ChildIsExpired() { return child_.expired(); } + + Future<> DoClose() override { return final_close_future_; } + + private: + Future<> final_close_future_; + std::shared_ptr owned_child_; + std::weak_ptr child_; +}; + +template +void AssertDoesNotCloseEarly() { + Future<> gate = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery([&](Nursery* nursery) { + nursery->AddRoot( + [&](AsyncCloseable* parent) { return std::make_shared(parent, gate); }); + })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + gate.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +}; + +TEST(AsyncNursery, DoClose) { AssertDoesNotCloseEarly(); } + +TEST(AsyncNursery, ChildDoClose) { AssertDoesNotCloseEarly(); } + +TEST(AsyncNursery, DependentTask) { + AssertDoesNotCloseEarly(); +} + +TEST(AsyncNursery, EvictedChild) { + Future<> child_future = Future<>::Make(); + Future<> final_future = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery([&](Nursery* nursery) { + std::shared_ptr evicts_child = + nursery->AddRoot([&](AsyncCloseable* parent) { + return std::make_shared(parent, child_future, final_future); + }); + evicts_child->EvictChild(); + // Owner no longer has reference to child here but it's kept alive by nursery + // because it isn't done + ASSERT_FALSE(evicts_child->ChildIsExpired()); + child_future.MarkFinished(); + ASSERT_TRUE(evicts_child->ChildIsExpired()); + })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + final_future.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +} + +} // namespace util +} // namespace arrow \ No newline at end of file From 99abda3e9913a840a3562c16bed18e97ffbec2f5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 20 Aug 2021 13:23:04 -1000 Subject: [PATCH 02/11] ARROW-13542: Updated async nursery to use custom deleter, made parents optional --- cpp/src/arrow/util/async_nursery.cc | 133 +++++++++++------------ cpp/src/arrow/util/async_nursery.h | 121 ++++++++++----------- cpp/src/arrow/util/async_nursery_test.cc | 96 ++++++++++------ 3 files changed, 185 insertions(+), 165 deletions(-) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc index 2147cec3ad8..e0ff2fda081 100644 --- a/cpp/src/arrow/util/async_nursery.cc +++ b/cpp/src/arrow/util/async_nursery.cc @@ -22,45 +22,62 @@ namespace arrow { namespace util { -AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) : on_closed_(Future<>::Make()) { - if (parent) { - Mutex::Guard guard = mutex_.Lock(); - parent->children_.push_back(this); - self_itr_ = --parent->children_.end(); - } +AsyncCloseable::AsyncCloseable() = default; +AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) { + AddDependentTask(parent->OnClosed()); } -AsyncCloseable::~AsyncCloseable() { DCHECK(close_complete_.load()); } +AsyncCloseable::~AsyncCloseable() { + // FIXME - Would be awesome if there were a way to enforce this at compile time + DCHECK_NE(nursery_, nullptr) << "An AsyncCloseable must be created with a nursery " + "using MakeSharedCloseable or MakeUniqueCloseable"; +} -Future<> AsyncCloseable::OnClosed() { return on_closed_; } +const Future<>& AsyncCloseable::OnClosed() { + // Lazily create the future to save effort if we don't need it + if (!on_closed_.is_valid()) { + on_closed_ = Future<>::Make(); + } + return on_closed_; +} -Future<> AsyncCloseable::Close() { - { - Mutex::Guard guard = mutex_.Lock(); - if (closed_.load()) { - return Future<>::MakeFinished(); +void AsyncCloseable::AddDependentTask(const Future<>& task) { + DCHECK(!closed_); + if (num_tasks_outstanding_.fetch_add(1) == 1) { + tasks_finished_ = Future<>::Make(); + }; + task.AddCallback([this](const Status& st) { + if (num_tasks_outstanding_.fetch_sub(1) == 1 && closed_.load()) { + tasks_finished_.MarkFinished(st); } - closed_.store(true); - } - return DoClose() - .Then([this] { - close_complete_.store(true); - on_closed_.MarkFinished(); - return CloseChildren(); - }) - .Then([] {}, - [this](const Status& err) { - close_complete_.store(true); - on_closed_.MarkFinished(err); - return err; - }); + }); } -Future<> AsyncCloseable::CloseChildren() { - for (auto& child : children_) { - tasks_.push_back(child->Close()); +void AsyncCloseable::SetNursery(Nursery* nursery) { nursery_ = nursery; } + +void AsyncCloseable::Destroy() { + DCHECK_NE(nursery_, nullptr); + closed_ = true; + nursery_->num_closeables_destroyed_.fetch_add(1); + Future<> finish_fut; + if (tasks_finished_.is_valid()) { + if (num_tasks_outstanding_.fetch_sub(1) > 1) { + finish_fut = AllComplete({DoClose(), tasks_finished_}); + } else { + // Any added tasks have already finished so there is nothing to wait for + finish_fut = DoClose(); + } + } else { + // No dependent tasks were added + finish_fut = DoClose(); } - return AllComplete(tasks_); + finish_fut.AddCallback([this](const Status& st) { + if (on_closed_.is_valid()) { + on_closed_.MarkFinished(st); + } + nursery_->OnTaskFinished(st); + delete this; + }); } Status AsyncCloseable::CheckClosed() const { @@ -70,56 +87,38 @@ Status AsyncCloseable::CheckClosed() const { return Status::OK(); } -void AsyncCloseable::AssertNotCloseComplete() const { DCHECK(!close_complete_); } - -void AsyncCloseable::AddDependentTask(Future<> task) { - tasks_.push_back(std::move(task)); -} - -OwnedAsyncCloseable::OwnedAsyncCloseable(AsyncCloseable* parent) - : AsyncCloseable(parent) { - parent_ = parent; -} +Nursery::Nursery() : finished_(Future<>::Make()){}; -void OwnedAsyncCloseable::Init() { - Mutex::Guard lock = parent_->mutex_.Lock(); - parent_->owned_children_.push_back(shared_from_this()); - owned_self_itr_ = --parent_->owned_children_.end(); +Status Nursery::WaitForFinish() { + if (num_closeables_destroyed_.load() != num_closeables_created_.load()) { + return Status::UnknownError( + "Not all closeables that were created during the nursery were destroyed. " + "Something must be holding onto a shared_ptr/unique_ptr reference."); + } + if (num_tasks_outstanding_.fetch_sub(1) == 1) { + // All tasks done, nothing to wait for + return Status::OK(); + } + return finished_.status(); } -void OwnedAsyncCloseable::Evict() { - { - Mutex::Guard lock = parent_->mutex_.Lock(); - if (parent_->closed_) { - // Parent is already closing, no need to do anything, the parent will call close on - // this instance eventually - return; - } - parent_->children_.erase(self_itr_); +void Nursery::OnTaskFinished(Status st) { + if (num_tasks_outstanding_.fetch_sub(1) == 1) { + finished_.MarkFinished(std::move(st)); } - // We need to add a dependent task to make sure our parent does not close itself - // while we are shutting down. - parent_->AddDependentTask(Close().Then([this] { - Mutex::Guard lock = parent_->mutex_.Lock(); - parent_->owned_children_.erase(owned_self_itr_); - })); } -NurseryPimpl::~NurseryPimpl() = default; - -Nursery::Nursery() : AsyncCloseable(nullptr) {} -Future<> Nursery::DoClose() { return Future<>::MakeFinished(); } - Status Nursery::RunInNursery(std::function task) { Nursery nursery; task(&nursery); - return nursery.Close().status(); + return nursery.WaitForFinish(); } Status Nursery::RunInNurserySt(std::function task) { Nursery nursery; Status task_st = task(&nursery); - Status close_st = nursery.Close().status(); + // Need to wait for everything to finish, even if invalid status + Status close_st = nursery.WaitForFinish(); return task_st & close_st; } diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index 51355f7e1ee..d7f63fb80d6 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -28,114 +28,109 @@ namespace arrow { namespace util { -class OwnedAsyncCloseable; +class Nursery; + +template +struct DestroyingDeleter { + void operator()(T* p) { p->Destroy(); } +}; /// An object which should be asynchronously closed before it is destroyed /// /// Any AsyncCloseable must be kept alive until its parent is destroyed (this is a given /// if the parent is a nursery). For shorter lived tasks/objects consider /// OwnedAsyncCloseable adding a dependent task. -class AsyncCloseable { +class AsyncCloseable : public std::enable_shared_from_this { public: - /// \brief Construct an AsyncCloseable as a child of `parent` + AsyncCloseable(); explicit AsyncCloseable(AsyncCloseable* parent); virtual ~AsyncCloseable(); /// Returns a future that is completed when this object is finished closing - Future<> OnClosed(); + const Future<>& OnClosed(); protected: - /// Closes the AsyncCloseable - /// This will first call DoClose and then simultaneously close all children and - /// tasks_ - virtual Future<> Close(); - /// Subclasses should override this and perform any cleanup that is not captured by - /// tasks_. Once the future returned by this method finishes then this object is - /// eligible for destruction and any reference to `this` may be invalid + /// Subclasses should override this and perform any cleanup. Once the future returned + /// by this method finishes then this object is eligible for destruction and any + /// reference to `this` will be invalid virtual Future<> DoClose() = 0; /// This method is called by subclasses to add tasks which must complete before the /// object can be safely deleted - void AddDependentTask(Future<> task); + void AddDependentTask(const Future<>& task); /// This method can be called by subclasses for error checking purposes. It will /// return an invalid status if this object has started closing Status CheckClosed() const; - /// This can be used for sanity checking that a callback is not run after close has - /// been finished. It will assert if the object has been fully closed (and `this` - /// references are unsafe) - void AssertNotCloseComplete() const; private: - Future<> CloseChildren(); + void SetNursery(Nursery* nursery); + void Destroy(); - std::list children_; - std::list::iterator self_itr_; - std::list> owned_children_; - std::vector> tasks_; Future<> on_closed_; + Future<> tasks_finished_; std::atomic closed_{false}; - std::atomic close_complete_{false}; - util::Mutex mutex_; - - friend OwnedAsyncCloseable; -}; - -/// An override of AsyncCloseable which is eligible for eviction. Instances must be -/// created as shared pointers as this utilizes enable_shared_from_this. -class OwnedAsyncCloseable : public AsyncCloseable, - public std::enable_shared_from_this { - public: - explicit OwnedAsyncCloseable(AsyncCloseable* parent); - - void Init(); - /// \brief Marks this instance as eligible for removal. This will start the close - /// process and, when it is finished, the instance will be deleted. - void Evict(); + std::atomic num_tasks_outstanding_{1}; + Nursery* nursery_; - protected: - AsyncCloseable* parent_; - std::list>::iterator owned_self_itr_; + friend Nursery; + template + friend struct DestroyingDeleter; }; -/// Helper base class which allows classes that implement the pimpl pattern to -/// participate in a nursery. This is only needed if the type is going to be -/// used as a nursery root. -class NurseryPimpl { +class Nursery { public: - virtual ~NurseryPimpl(); - /// Transfers ownership of the impl to the nursery - virtual std::shared_ptr TransferOwnership() = 0; -}; + // FIXME: Add static_assert that T extends AsyncCloseable for friendlier error message + template + typename std::enable_if::value, std::shared_ptr>::type + MakeSharedCloseable(Args&&... args) { + num_closeables_created_.fetch_add(1); + num_tasks_outstanding_.fetch_add(1); + std::shared_ptr shared_closeable(new T(std::forward(args)...), + DestroyingDeleter()); + shared_closeable->SetNursery(this); + return shared_closeable; + } -class Nursery : private AsyncCloseable { - public: - template - std::shared_ptr AddRoot(std::function(AsyncCloseable*)> factory) { - std::shared_ptr root = factory(this); - roots_.push_back(std::static_pointer_cast(root)); - return root; + template + typename std::enable_if::value, + std::unique_ptr>>::type + MakeUniqueCloseable(Args&&... args) { + num_closeables_created_.fetch_add(1); + num_tasks_outstanding_.fetch_add(1); + auto unique_closeable = std::unique_ptr>( + new T(std::forward(args)...), DestroyingDeleter()); + unique_closeable->SetNursery(this); + return unique_closeable; } template - std::shared_ptr AddPimplRoot( - std::function(AsyncCloseable*)> factory) { - std::shared_ptr pimpl = factory(this); - roots_.push_back(pimpl->TransferOwnership()); - return pimpl; + void AddDependentTask(const Future& task) { + num_tasks_outstanding_.fetch_add(1); + task.AddCallback([this](const Result& res) { OnTaskFinished(res.status()); }); } /// Runs `task` within a nursery. This method will not return until /// all roots added by the task have been closed and destroyed. static Status RunInNursery(std::function task); // FIXME template hackery to callable that can return void/Status/Result + // to work correctly with PIMPL objects this may just mean making sure the + // Destroy method works. Or maybe we want a marker class for PIMPL? static Status RunInNurserySt(std::function task); protected: - Future<> DoClose() override; + Status WaitForFinish(); + void OnTaskFinished(Status st); Nursery(); - std::vector> roots_; + // Rather than keep a separate closing flag (and requiring mutex) we treat + // "closing" as a default task + std::atomic num_tasks_outstanding_{1}; + std::atomic num_closeables_created_{0}; + std::atomic num_closeables_destroyed_{0}; + Future<> finished_; + + friend AsyncCloseable; }; } // namespace util diff --git a/cpp/src/arrow/util/async_nursery_test.cc b/cpp/src/arrow/util/async_nursery_test.cc index 358c20dbd18..e72d9e2651e 100644 --- a/cpp/src/arrow/util/async_nursery_test.cc +++ b/cpp/src/arrow/util/async_nursery_test.cc @@ -32,65 +32,70 @@ class GatingDoClose : public AsyncCloseable { public: GatingDoClose(AsyncCloseable* parent, Future<> close_future) : AsyncCloseable(parent), close_future_(std::move(close_future)) {} + GatingDoClose(Future<> close_future) : close_future_(std::move(close_future)) {} Future<> DoClose() override { return close_future_; } Future<> close_future_; }; -class GatingDoCloseWithChild : public AsyncCloseable { +class GatingDoCloseWithUniqueChild : public AsyncCloseable { public: - GatingDoCloseWithChild(AsyncCloseable* parent, Future<> close_future) - : AsyncCloseable(parent), child_(this, std::move(close_future)) {} + GatingDoCloseWithUniqueChild(Nursery* nursery, Future<> close_future) + : child_( + nursery->MakeUniqueCloseable(this, std::move(close_future))) {} Future<> DoClose() override { return Future<>::MakeFinished(); } - GatingDoClose child_; + std::unique_ptr> child_; +}; + +class GatingDoCloseWithSharedChild : public AsyncCloseable { + public: + GatingDoCloseWithSharedChild(Nursery* nursery, Future<> close_future) + : child_( + nursery->MakeSharedCloseable(this, std::move(close_future))) {} + + Future<> DoClose() override { return Future<>::MakeFinished(); } + + std::shared_ptr child_; }; class GatingDoCloseAsDependentTask : public AsyncCloseable { public: - GatingDoCloseAsDependentTask(AsyncCloseable* parent, Future<> close_future) - : AsyncCloseable(parent) { + GatingDoCloseAsDependentTask(Future<> close_future) { AddDependentTask(std::move(close_future)); } Future<> DoClose() override { return Future<>::MakeFinished(); } }; -class EvictableChild : public OwnedAsyncCloseable { +class MarkWhenDestroyed : public GatingDoClose { public: - EvictableChild(AsyncCloseable* parent, Future<> close_future) - : OwnedAsyncCloseable(parent), close_future_(std::move(close_future)) {} - - Future<> DoClose() override { return close_future_; } + MarkWhenDestroyed(Future<> close_future, bool* destroyed) + : GatingDoClose(std::move(close_future)), destroyed_(destroyed) {} + ~MarkWhenDestroyed() { *destroyed_ = true; } private: - Future<> close_future_; + bool* destroyed_; }; class EvictsChild : public AsyncCloseable { public: - EvictsChild(AsyncCloseable* parent, Future<> child_future, Future<> final_future) - : AsyncCloseable(parent), final_close_future_(std::move(final_future)) { - owned_child_ = std::make_shared(this, std::move(child_future)); - owned_child_->Init(); - child_ = owned_child_; - } - - void EvictChild() { - owned_child_->Evict(); - owned_child_.reset(); + EvictsChild(Nursery* nursery, bool* child_destroyed, Future<> child_future, + Future<> final_future) + : final_close_future_(std::move(final_future)) { + owned_child_ = nursery->MakeSharedCloseable( + std::move(child_future), child_destroyed); } - bool ChildIsExpired() { return child_.expired(); } + void EvictChild() { owned_child_.reset(); } Future<> DoClose() override { return final_close_future_; } private: Future<> final_close_future_; - std::shared_ptr owned_child_; - std::weak_ptr child_; + std::shared_ptr owned_child_; }; template @@ -98,10 +103,25 @@ void AssertDoesNotCloseEarly() { Future<> gate = Future<>::Make(); std::atomic finished{false}; std::thread thread([&] { - ASSERT_OK(Nursery::RunInNursery([&](Nursery* nursery) { - nursery->AddRoot( - [&](AsyncCloseable* parent) { return std::make_shared(parent, gate); }); - })); + ASSERT_OK(Nursery::RunInNursery( + [&](Nursery* nursery) { nursery->MakeSharedCloseable(gate); })); + finished.store(true); + }); + + SleepABit(); + ASSERT_FALSE(finished.load()); + gate.MarkFinished(); + BusyWait(10, [&] { return finished.load(); }); + thread.join(); +}; + +template +void AssertDoesNotCloseEarlyWithChild() { + Future<> gate = Future<>::Make(); + std::atomic finished{false}; + std::thread thread([&] { + ASSERT_OK(Nursery::RunInNursery( + [&](Nursery* nursery) { nursery->MakeSharedCloseable(nursery, gate); })); finished.store(true); }); @@ -114,7 +134,13 @@ void AssertDoesNotCloseEarly() { TEST(AsyncNursery, DoClose) { AssertDoesNotCloseEarly(); } -TEST(AsyncNursery, ChildDoClose) { AssertDoesNotCloseEarly(); } +TEST(AsyncNursery, SharedChildDoClose) { + AssertDoesNotCloseEarlyWithChild(); +} + +TEST(AsyncNursery, UniqueChildDoClose) { + AssertDoesNotCloseEarlyWithChild(); +} TEST(AsyncNursery, DependentTask) { AssertDoesNotCloseEarly(); @@ -126,16 +152,16 @@ TEST(AsyncNursery, EvictedChild) { std::atomic finished{false}; std::thread thread([&] { ASSERT_OK(Nursery::RunInNursery([&](Nursery* nursery) { + bool child_destroyed = false; std::shared_ptr evicts_child = - nursery->AddRoot([&](AsyncCloseable* parent) { - return std::make_shared(parent, child_future, final_future); - }); + nursery->MakeSharedCloseable(nursery, &child_destroyed, + child_future, final_future); evicts_child->EvictChild(); // Owner no longer has reference to child here but it's kept alive by nursery // because it isn't done - ASSERT_FALSE(evicts_child->ChildIsExpired()); + ASSERT_FALSE(child_destroyed); child_future.MarkFinished(); - ASSERT_TRUE(evicts_child->ChildIsExpired()); + ASSERT_TRUE(child_destroyed); })); finished.store(true); }); From f718e6f1579a2bd3fc7247691c864f6a169c4453 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 20 Aug 2021 14:12:21 -1000 Subject: [PATCH 03/11] ARROW-13542: A few more tweaks to the async nursery --- cpp/src/arrow/util/async_nursery.cc | 4 ++++ cpp/src/arrow/util/async_nursery.h | 20 +++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc index e0ff2fda081..bf0c2d2ad1a 100644 --- a/cpp/src/arrow/util/async_nursery.cc +++ b/cpp/src/arrow/util/async_nursery.cc @@ -87,6 +87,10 @@ Status AsyncCloseable::CheckClosed() const { return Status::OK(); } +void AsyncCloseablePimpl::Init(AsyncCloseable* impl) { impl_ = impl; } +void AsyncCloseablePimpl::Destroy() { impl_->Destroy(); } +void AsyncCloseablePimpl::SetNursery(Nursery* nursery) { impl_->SetNursery(nursery); } + Nursery::Nursery() : finished_(Future<>::Make()){}; Status Nursery::WaitForFinish() { diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index d7f63fb80d6..e906bd0386f 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -29,6 +29,7 @@ namespace arrow { namespace util { class Nursery; +class AsyncCloseablePimpl; template struct DestroyingDeleter { @@ -62,6 +63,8 @@ class AsyncCloseable : public std::enable_shared_from_this { /// return an invalid status if this object has started closing Status CheckClosed() const; + Nursery* nursery_; + private: void SetNursery(Nursery* nursery); void Destroy(); @@ -70,7 +73,22 @@ class AsyncCloseable : public std::enable_shared_from_this { Future<> tasks_finished_; std::atomic closed_{false}; std::atomic num_tasks_outstanding_{1}; - Nursery* nursery_; + + friend Nursery; + template + friend struct DestroyingDeleter; + friend AsyncCloseablePimpl; +}; + +class AsyncCloseablePimpl { + protected: + void Init(AsyncCloseable* impl); + + private: + void SetNursery(Nursery* nursery); + void Destroy(); + + AsyncCloseable* impl_; friend Nursery; template From 3109527462e3222a8ecf6cd840013efef204c6f3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 11:08:02 -1000 Subject: [PATCH 04/11] ARROW-13542: Logic for parent pointer was backwards. If you pass a parent then the parent should stay alive until the child is finished, not the other way around. --- cpp/src/arrow/util/async_nursery.cc | 2 +- cpp/src/arrow/util/async_nursery_test.cc | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc index bf0c2d2ad1a..8e26f5809ff 100644 --- a/cpp/src/arrow/util/async_nursery.cc +++ b/cpp/src/arrow/util/async_nursery.cc @@ -24,7 +24,7 @@ namespace util { AsyncCloseable::AsyncCloseable() = default; AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) { - AddDependentTask(parent->OnClosed()); + parent->AddDependentTask(OnClosed()); } AsyncCloseable::~AsyncCloseable() { diff --git a/cpp/src/arrow/util/async_nursery_test.cc b/cpp/src/arrow/util/async_nursery_test.cc index e72d9e2651e..ba33cfd2b0b 100644 --- a/cpp/src/arrow/util/async_nursery_test.cc +++ b/cpp/src/arrow/util/async_nursery_test.cc @@ -45,7 +45,10 @@ class GatingDoCloseWithUniqueChild : public AsyncCloseable { : child_( nursery->MakeUniqueCloseable(this, std::move(close_future))) {} - Future<> DoClose() override { return Future<>::MakeFinished(); } + Future<> DoClose() override { + child_.reset(); + return Future<>::MakeFinished(); + } std::unique_ptr> child_; }; @@ -56,7 +59,10 @@ class GatingDoCloseWithSharedChild : public AsyncCloseable { : child_( nursery->MakeSharedCloseable(this, std::move(close_future))) {} - Future<> DoClose() override { return Future<>::MakeFinished(); } + Future<> DoClose() override { + child_.reset(); + return Future<>::MakeFinished(); + } std::shared_ptr child_; }; From b0eb1c561ca64881af20747a907b097a3d52d67b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 13:42:57 -1000 Subject: [PATCH 05/11] ARROW-13542: Lint --- cpp/src/arrow/util/async_nursery.cc | 4 ++-- cpp/src/arrow/util/async_nursery_test.cc | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc index 8e26f5809ff..aff6cc1a5b7 100644 --- a/cpp/src/arrow/util/async_nursery.cc +++ b/cpp/src/arrow/util/async_nursery.cc @@ -45,7 +45,7 @@ void AsyncCloseable::AddDependentTask(const Future<>& task) { DCHECK(!closed_); if (num_tasks_outstanding_.fetch_add(1) == 1) { tasks_finished_ = Future<>::Make(); - }; + } task.AddCallback([this](const Status& st) { if (num_tasks_outstanding_.fetch_sub(1) == 1 && closed_.load()) { tasks_finished_.MarkFinished(st); @@ -91,7 +91,7 @@ void AsyncCloseablePimpl::Init(AsyncCloseable* impl) { impl_ = impl; } void AsyncCloseablePimpl::Destroy() { impl_->Destroy(); } void AsyncCloseablePimpl::SetNursery(Nursery* nursery) { impl_->SetNursery(nursery); } -Nursery::Nursery() : finished_(Future<>::Make()){}; +Nursery::Nursery() : finished_(Future<>::Make()) {} Status Nursery::WaitForFinish() { if (num_closeables_destroyed_.load() != num_closeables_created_.load()) { diff --git a/cpp/src/arrow/util/async_nursery_test.cc b/cpp/src/arrow/util/async_nursery_test.cc index ba33cfd2b0b..f3faf20ff93 100644 --- a/cpp/src/arrow/util/async_nursery_test.cc +++ b/cpp/src/arrow/util/async_nursery_test.cc @@ -32,7 +32,8 @@ class GatingDoClose : public AsyncCloseable { public: GatingDoClose(AsyncCloseable* parent, Future<> close_future) : AsyncCloseable(parent), close_future_(std::move(close_future)) {} - GatingDoClose(Future<> close_future) : close_future_(std::move(close_future)) {} + explicit GatingDoClose(Future<> close_future) + : close_future_(std::move(close_future)) {} Future<> DoClose() override { return close_future_; } @@ -69,7 +70,7 @@ class GatingDoCloseWithSharedChild : public AsyncCloseable { class GatingDoCloseAsDependentTask : public AsyncCloseable { public: - GatingDoCloseAsDependentTask(Future<> close_future) { + explicit GatingDoCloseAsDependentTask(Future<> close_future) { AddDependentTask(std::move(close_future)); } @@ -119,7 +120,7 @@ void AssertDoesNotCloseEarly() { gate.MarkFinished(); BusyWait(10, [&] { return finished.load(); }); thread.join(); -}; +} template void AssertDoesNotCloseEarlyWithChild() { @@ -136,7 +137,7 @@ void AssertDoesNotCloseEarlyWithChild() { gate.MarkFinished(); BusyWait(10, [&] { return finished.load(); }); thread.join(); -}; +} TEST(AsyncNursery, DoClose) { AssertDoesNotCloseEarly(); } @@ -180,4 +181,4 @@ TEST(AsyncNursery, EvictedChild) { } } // namespace util -} // namespace arrow \ No newline at end of file +} // namespace arrow From 1b0a5813f765aabf02341105cf73b4b9e4fc575d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 16:02:43 -1000 Subject: [PATCH 06/11] Lint --- cpp/src/arrow/util/CMakeLists.txt | 94 +++++++++++++++---------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 194e2e5d8e1..6b3bcff8aa5 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -26,57 +26,57 @@ arrow_install_all_headers("arrow/util") # arrow_test_main # -if (WIN32) - # This manifest enables long file paths on Windows 10+ - # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#enable-long-paths-in-windows-10-version-1607-and-later - if (MSVC) - set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.manifest) - else () - set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.rc) - endif () -else () - set(IO_UTIL_TEST_SOURCES io_util_test.cc) -endif () +if(WIN32) + # This manifest enables long file paths on Windows 10+ + # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#enable-long-paths-in-windows-10-version-1607-and-later + if(MSVC) + set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.manifest) + else() + set(IO_UTIL_TEST_SOURCES io_util_test.cc io_util_test.rc) + endif() +else() + set(IO_UTIL_TEST_SOURCES io_util_test.cc) +endif() add_arrow_test(utility-test - SOURCES - align_util_test.cc - async_generator_test.cc - async_nursery_test.cc - bit_block_counter_test.cc - bit_util_test.cc - cache_test.cc - checked_cast_test.cc - compression_test.cc - decimal_test.cc - formatting_util_test.cc - key_value_metadata_test.cc - hashing_test.cc - int_util_test.cc - ${IO_UTIL_TEST_SOURCES} - iterator_test.cc - logging_test.cc - queue_test.cc - range_test.cc - reflection_test.cc - rle_encoding_test.cc - stl_util_test.cc - string_test.cc - tdigest_test.cc - test_common.cc - time_test.cc - trie_test.cc - uri_test.cc - utf8_util_test.cc - value_parsing_test.cc - variant_test.cc) + SOURCES + align_util_test.cc + async_generator_test.cc + async_nursery_test.cc + bit_block_counter_test.cc + bit_util_test.cc + cache_test.cc + checked_cast_test.cc + compression_test.cc + decimal_test.cc + formatting_util_test.cc + key_value_metadata_test.cc + hashing_test.cc + int_util_test.cc + ${IO_UTIL_TEST_SOURCES} + iterator_test.cc + logging_test.cc + queue_test.cc + range_test.cc + reflection_test.cc + rle_encoding_test.cc + stl_util_test.cc + string_test.cc + tdigest_test.cc + test_common.cc + time_test.cc + trie_test.cc + uri_test.cc + utf8_util_test.cc + value_parsing_test.cc + variant_test.cc) add_arrow_test(threading-utility-test - SOURCES - cancel_test.cc - future_test.cc - task_group_test.cc - thread_pool_test.cc) + SOURCES + cancel_test.cc + future_test.cc + task_group_test.cc + thread_pool_test.cc) add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) From 47399165ccd2ac559df0f5dc0aa0da4e192a5866 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 16:39:28 -1000 Subject: [PATCH 07/11] Adding ARROW_EXPORT --- cpp/src/arrow/util/async_nursery.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index e906bd0386f..4e5669e482e 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -32,7 +32,7 @@ class Nursery; class AsyncCloseablePimpl; template -struct DestroyingDeleter { +struct ARROW_EXPORT DestroyingDeleter { void operator()(T* p) { p->Destroy(); } }; @@ -41,7 +41,7 @@ struct DestroyingDeleter { /// Any AsyncCloseable must be kept alive until its parent is destroyed (this is a given /// if the parent is a nursery). For shorter lived tasks/objects consider /// OwnedAsyncCloseable adding a dependent task. -class AsyncCloseable : public std::enable_shared_from_this { +class ARROW_EXPORT AsyncCloseable : public std::enable_shared_from_this { public: AsyncCloseable(); explicit AsyncCloseable(AsyncCloseable* parent); @@ -80,7 +80,7 @@ class AsyncCloseable : public std::enable_shared_from_this { friend AsyncCloseablePimpl; }; -class AsyncCloseablePimpl { +class ARROW_EXPORT AsyncCloseablePimpl { protected: void Init(AsyncCloseable* impl); @@ -95,7 +95,7 @@ class AsyncCloseablePimpl { friend struct DestroyingDeleter; }; -class Nursery { +class ARROW_EXPORT Nursery { public: // FIXME: Add static_assert that T extends AsyncCloseable for friendlier error message template From 808d23be214a9e0989cc898bd2b38fecc0128ed5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 17:09:15 -1000 Subject: [PATCH 08/11] Fixing a bug in futures that could be exposed when synchronously waiting for a future to complete and then deleting the future (which still has callbacks to run that haven't yet been captured --- cpp/src/arrow/util/future.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index fc8022a95e4..c67e9b34969 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -312,11 +312,13 @@ class ConcreteFutureImpl : public FutureImpl { waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state); } } - cv_.notify_all(); - auto callbacks = std::move(callbacks_); auto self = shared_from_this(); + // Don't notify until we've saved off all state. After notifying it is common for + // this to be deleted and we shouldn't access any local state. + cv_.notify_all(); + // run callbacks, lock not needed since the future is finished by this // point so nothing else can modify the callbacks list and it is safe // to iterate. From debb3b37b09feaa48dddd085f4bdd2257d78f169 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Aug 2021 17:28:02 -1000 Subject: [PATCH 09/11] Still working on ARROW_EPXORT correctness --- cpp/src/arrow/util/async_nursery.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index 4e5669e482e..0218a99505d 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -32,7 +32,7 @@ class Nursery; class AsyncCloseablePimpl; template -struct ARROW_EXPORT DestroyingDeleter { +struct DestroyingDeleter { void operator()(T* p) { p->Destroy(); } }; From 7561cfbb8bb848800131b7ad66c84209cb6a1a67 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 27 Aug 2021 09:13:39 -1000 Subject: [PATCH 10/11] Cleanup and address PR review --- cpp/src/arrow/util/async_nursery.cc | 15 ++++----------- cpp/src/arrow/util/async_nursery.h | 14 ++++++-------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/util/async_nursery.cc b/cpp/src/arrow/util/async_nursery.cc index aff6cc1a5b7..def884a1f50 100644 --- a/cpp/src/arrow/util/async_nursery.cc +++ b/cpp/src/arrow/util/async_nursery.cc @@ -22,24 +22,17 @@ namespace arrow { namespace util { -AsyncCloseable::AsyncCloseable() = default; -AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) { +AsyncCloseable::AsyncCloseable() : on_closed_(Future<>::Make()) {} +AsyncCloseable::AsyncCloseable(AsyncCloseable* parent) : on_closed_(Future<>::Make()) { parent->AddDependentTask(OnClosed()); } AsyncCloseable::~AsyncCloseable() { - // FIXME - Would be awesome if there were a way to enforce this at compile time DCHECK_NE(nursery_, nullptr) << "An AsyncCloseable must be created with a nursery " "using MakeSharedCloseable or MakeUniqueCloseable"; } -const Future<>& AsyncCloseable::OnClosed() { - // Lazily create the future to save effort if we don't need it - if (!on_closed_.is_valid()) { - on_closed_ = Future<>::Make(); - } - return on_closed_; -} +const Future<>& AsyncCloseable::OnClosed() { return on_closed_; } void AsyncCloseable::AddDependentTask(const Future<>& task) { DCHECK(!closed_); @@ -118,7 +111,7 @@ Status Nursery::RunInNursery(std::function task) { return nursery.WaitForFinish(); } -Status Nursery::RunInNurserySt(std::function task) { +Status Nursery::RunInNursery(std::function task) { Nursery nursery; Status task_st = task(&nursery); // Need to wait for everything to finish, even if invalid status diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index 0218a99505d..96aa135eee1 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_ASYNC_NURSERY_H -#define ARROW_ASYNC_NURSERY_H +#pragma once #include @@ -101,6 +100,8 @@ class ARROW_EXPORT Nursery { template typename std::enable_if::value, std::shared_ptr>::type MakeSharedCloseable(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeSharedCloseable only works with AsyncCloseable types"); num_closeables_created_.fetch_add(1); num_tasks_outstanding_.fetch_add(1); std::shared_ptr shared_closeable(new T(std::forward(args)...), @@ -113,6 +114,8 @@ class ARROW_EXPORT Nursery { typename std::enable_if::value, std::unique_ptr>>::type MakeUniqueCloseable(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeUniqueCloseable only works with AsyncCloseable types"); num_closeables_created_.fetch_add(1); num_tasks_outstanding_.fetch_add(1); auto unique_closeable = std::unique_ptr>( @@ -130,10 +133,7 @@ class ARROW_EXPORT Nursery { /// Runs `task` within a nursery. This method will not return until /// all roots added by the task have been closed and destroyed. static Status RunInNursery(std::function task); - // FIXME template hackery to callable that can return void/Status/Result - // to work correctly with PIMPL objects this may just mean making sure the - // Destroy method works. Or maybe we want a marker class for PIMPL? - static Status RunInNurserySt(std::function task); + static Status RunInNursery(std::function task); protected: Status WaitForFinish(); @@ -153,5 +153,3 @@ class ARROW_EXPORT Nursery { } // namespace util } // namespace arrow - -#endif // ARROW_ASYNC_NURSERY_H From 389292ec6123b453958c7f7669cee3090aceec7b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 27 Aug 2021 09:51:55 -1000 Subject: [PATCH 11/11] Removed stale comment --- cpp/src/arrow/util/async_nursery.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/util/async_nursery.h b/cpp/src/arrow/util/async_nursery.h index 96aa135eee1..9ae1f9aa548 100644 --- a/cpp/src/arrow/util/async_nursery.h +++ b/cpp/src/arrow/util/async_nursery.h @@ -96,7 +96,6 @@ class ARROW_EXPORT AsyncCloseablePimpl { class ARROW_EXPORT Nursery { public: - // FIXME: Add static_assert that T extends AsyncCloseable for friendlier error message template typename std::enable_if::value, std::shared_ptr>::type MakeSharedCloseable(Args&&... args) {