From cef377afda08a63b2665746c472992166d20946a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 30 Aug 2021 17:07:09 -1000 Subject: [PATCH 1/5] Created basic async utilities for an async smart pointer and an async task group --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/util/CMakeLists.txt | 1 + cpp/src/arrow/util/async_util.cc | 75 ++++++++++++++ cpp/src/arrow/util/async_util.h | 105 ++++++++++++++++++++ cpp/src/arrow/util/async_util_test.cc | 134 ++++++++++++++++++++++++++ 5 files changed, 316 insertions(+) create mode 100644 cpp/src/arrow/util/async_util.cc create mode 100644 cpp/src/arrow/util/async_util.h create mode 100644 cpp/src/arrow/util/async_util_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 637f3d1a54f..e06fad9a1de 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -190,6 +190,7 @@ set(ARROW_SRCS io/slow.cc io/stdio.cc io/transform.cc + util/async_util.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index aa875ab6bee..1b14215ddd8 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -42,6 +42,7 @@ add_arrow_test(utility-test SOURCES align_util_test.cc async_generator_test.cc + async_util_test.cc bit_block_counter_test.cc bit_util_test.cc cache_test.cc diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc new file mode 100644 index 00000000000..f3972cfb7b1 --- /dev/null +++ b/cpp/src/arrow/util/async_util.cc @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_util.h" + +#include "arrow/util/future.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { + +AsyncDestroyable::AsyncDestroyable() : on_closed_(Future<>::Make()) {} + +#ifndef NDEBUG +AsyncDestroyable::~AsyncDestroyable() { + DCHECK(constructed_correctly_) << "An instance of AsyncDestroyable must be created by " + "MakeSharedAsync or MakeUniqueAsync"; +} +#elif +AsyncDestroyable::~AsyncDestroyable() = default; +#endif + +void AsyncDestroyable::Destroy() { + DoDestroy().AddCallback([this](const Status& st) { + on_closed_.MarkFinished(st); + delete this; + }); +} + +Status AsyncTaskGroup::AddTask(const Future<>& task) { + auto guard = mutex_.Lock(); + if (finished_adding_) { + return Status::Invalid("Attempt to add a task after StopAddingAndWait"); + } + if (!err_.ok()) { + return err_; + } + running_tasks_++; + guard.Unlock(); + task.AddCallback([this](const Status& st) { + auto guard = mutex_.Lock(); + err_ &= st; + if (--running_tasks_ == 0 && finished_adding_) { + guard.Unlock(); + all_tasks_done_.MarkFinished(err_); + } + }); + return Status::OK(); +} + +Future<> AsyncTaskGroup::StopAddingAndWait() { + auto guard = mutex_.Lock(); + finished_adding_ = true; + if (running_tasks_ == 0) { + return err_; + } + return all_tasks_done_; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h new file mode 100644 index 00000000000..ea3e3f3bd82 --- /dev/null +++ b/cpp/src/arrow/util/async_util.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/future.h" +#include "arrow/util/mutex.h" + +namespace arrow { +namespace util { + +template +struct DestroyingDeleter { + void operator()(T* p) { p->Destroy(); } +}; + +/// An object which should be asynchronously closed before it is destroyed +/// +/// Classes can extend this to ensure that the close method is called and completed +/// before the instance is deleted. +/// +/// Classes which extend this must be constructed using MakeSharedAsync or MakeUniqueAsync +class ARROW_EXPORT AsyncDestroyable { + public: + AsyncDestroyable(); + virtual ~AsyncDestroyable(); + + Future<> on_closed() { return on_closed_; } + + protected: + /// Subclasses should override this and perform any cleanup. Once the future returned + /// by this method finishes then this object is eligible for destruction and any + /// reference to `this` will be invalid + virtual Future<> DoDestroy() = 0; + + private: + void Destroy(); + + Future<> on_closed_; +#ifndef NDEBUG + bool constructed_correctly_ = false; +#endif + + template + friend struct DestroyingDeleter; + template + friend std::shared_ptr MakeSharedAsync(Args&&... args); + template + friend std::unique_ptr> MakeUniqueAsync(Args&&... args); +}; + +template +std::shared_ptr MakeSharedAsync(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeSharedCloseable only works with AsyncDestroyable types"); + std::shared_ptr ptr(new T(std::forward(args)...), DestroyingDeleter()); +#ifndef NDEBUG + ptr->constructed_correctly_ = true; +#endif + return ptr; +} + +template +std::unique_ptr> MakeUniqueAsync(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeUniqueCloseable only works with AsyncDestroyable types"); + std::unique_ptr> ptr(new T(std::forward(args)...), + DestroyingDeleter()); +#ifndef NDEBUG + ptr->constructed_correctly_ = true; +#endif + return ptr; +} + +class AsyncTaskGroup { + public: + Status AddTask(const Future<>& task); + Future<> StopAddingAndWait(); + + private: + bool finished_adding_ = false; + int running_tasks_ = 0; + Status err_; + Future<> all_tasks_done_ = Future<>::Make(); + util::Mutex mutex_; +}; + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc new file mode 100644 index 00000000000..266e587599b --- /dev/null +++ b/cpp/src/arrow/util/async_util_test.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_util.h" + +#include + +#include "arrow/result.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace util { + +class GatingDestroyable : public AsyncDestroyable { + public: + GatingDestroyable(Future<> close_future, bool* destroyed) + : close_future_(std::move(close_future)), destroyed_(destroyed) {} + ~GatingDestroyable() override { *destroyed_ = true; } + + protected: + Future<> DoDestroy() override { return close_future_; } + + private: + Future<> close_future_; + bool* destroyed_; +}; + +template +void TestAsyncDestroyable(Factory factory) { + Future<> gate = Future<>::Make(); + bool destroyed = false; + bool on_closed = false; + { + auto obj = factory(gate, &destroyed); + obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; }); + ASSERT_FALSE(destroyed); + } + ASSERT_FALSE(destroyed); + ASSERT_FALSE(on_closed); + gate.MarkFinished(); + ASSERT_TRUE(destroyed); + ASSERT_TRUE(on_closed); +} + +TEST(AsyncDestroyable, MakeShared) { + TestAsyncDestroyable([](Future<> gate, bool* destroyed) { + return MakeSharedAsync(gate, destroyed); + }); +} + +TEST(AsyncDestroyable, MakeUnique) { + Future<> gate = Future<>::Make(); + bool destroyed = false; + bool on_closed = false; + { + std::unique_ptr> obj = + MakeUniqueAsync(gate, &destroyed); + obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; }); + ASSERT_FALSE(destroyed); + } + ASSERT_FALSE(destroyed); + ASSERT_FALSE(on_closed); + gate.MarkFinished(); + ASSERT_TRUE(destroyed); + ASSERT_TRUE(on_closed); +} + +TEST(AsyncTaskGroup, Basic) { + AsyncTaskGroup task_group; + Future<> fut1 = Future<>::Make(); + Future<> fut2 = Future<>::Make(); + ASSERT_OK(task_group.AddTask(fut1)); + ASSERT_OK(task_group.AddTask(fut2)); + Future<> all_done = task_group.StopAddingAndWait(); + AssertNotFinished(all_done); + fut1.MarkFinished(); + AssertNotFinished(all_done); + fut2.MarkFinished(); + ASSERT_FINISHES_OK(all_done); +} + +TEST(AsyncTaskGroup, NoTasks) { + AsyncTaskGroup task_group; + ASSERT_FINISHES_OK(task_group.StopAddingAndWait()); +} + +TEST(AsyncTaskGroup, AddAfterDone) { + AsyncTaskGroup task_group; + ASSERT_FINISHES_OK(task_group.StopAddingAndWait()); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); +} + +TEST(AsyncTaskGroup, Error) { + AsyncTaskGroup task_group; + Future<> failed_task = Future<>::MakeFinished(Status::Invalid("XYZ")); + ASSERT_OK(task_group.AddTask(failed_task)); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.StopAddingAndWait()); +} + +TEST(AsyncTaskGroup, TaskFinishesAfterError) { + AsyncTaskGroup task_group; + Future<> fut1 = Future<>::Make(); + ASSERT_OK(task_group.AddTask(fut1)); + ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); + Future<> finished_fut = task_group.StopAddingAndWait(); + AssertNotFinished(finished_fut); + fut1.MarkFinished(); + ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); +} + +TEST(AsyncTaskGroup, AddAfterFailed) { + AsyncTaskGroup task_group; + ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.StopAddingAndWait()); +} + +} // namespace util +} // namespace arrow \ No newline at end of file From 681ca59c8dd54bd0e2b0626dd06c3258450b27c8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 3 Sep 2021 13:28:38 -1000 Subject: [PATCH 2/5] Changed the semantics on a completed task group slightly and improved the documentation. --- cpp/src/arrow/util/async_util.cc | 15 +++++++++---- cpp/src/arrow/util/async_util.h | 31 +++++++++++++++++++++++++-- cpp/src/arrow/util/async_util_test.cc | 28 ++++++++++++++++++------ 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index f3972cfb7b1..1e3367db107 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -43,12 +43,18 @@ void AsyncDestroyable::Destroy() { Status AsyncTaskGroup::AddTask(const Future<>& task) { auto guard = mutex_.Lock(); - if (finished_adding_) { - return Status::Invalid("Attempt to add a task after StopAddingAndWait"); + if (all_tasks_done_.is_finished()) { + return Status::Invalid("Attempt to add a task after the task group has completed"); } if (!err_.ok()) { return err_; } + // If the task is already finished there is nothing to track so lets save + // some work and return early + if (task.is_finished()) { + err_ &= task.status(); + return Status::OK(); + } running_tasks_++; guard.Unlock(); task.AddCallback([this](const Status& st) { @@ -62,11 +68,12 @@ Status AsyncTaskGroup::AddTask(const Future<>& task) { return Status::OK(); } -Future<> AsyncTaskGroup::StopAddingAndWait() { +Future<> AsyncTaskGroup::WaitForTasksToFinish() { auto guard = mutex_.Lock(); finished_adding_ = true; if (running_tasks_ == 0) { - return err_; + all_tasks_done_.MarkFinished(err_); + return all_tasks_done_; } return all_tasks_done_; } diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index ea3e3f3bd82..8dc51f8e0e3 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -25,6 +25,7 @@ namespace arrow { namespace util { +/// Custom deleter for AsyncDestroyable objects template struct DestroyingDeleter { void operator()(T* p) { p->Destroy(); } @@ -33,7 +34,8 @@ struct DestroyingDeleter { /// An object which should be asynchronously closed before it is destroyed /// /// Classes can extend this to ensure that the close method is called and completed -/// before the instance is deleted. +/// before the instance is deleted. This provides smart_ptr / delete semantics for +/// objects with an asynchronous destructor. /// /// Classes which extend this must be constructed using MakeSharedAsync or MakeUniqueAsync class ARROW_EXPORT AsyncDestroyable { @@ -41,6 +43,11 @@ class ARROW_EXPORT AsyncDestroyable { AsyncDestroyable(); virtual ~AsyncDestroyable(); + /// A future which will complete when the AsyncDestroyable has finished and is ready + /// to be deleted. + /// + /// This can be used to ensure all work done by this object has been completed before + /// proceeding. Future<> on_closed() { return on_closed_; } protected: @@ -88,10 +95,30 @@ std::unique_ptr> MakeUniqueAsync(Args&&... args) { return ptr; } +/// A utility which keeps track of a collection of asynchronous tasks +/// +/// This can be used to provide structured concurrency for asynchronous development. +/// A task group created at a high level can be distributed amongst low level components +/// which register work to be completed. The high level job can then wait for all work +/// to be completed before cleaning up. class AsyncTaskGroup { public: + /// Add a task to be tracked by this task group + /// + /// If a previous task has failed then adding a task will fail + /// + /// If WaitForTasksToFinish has been called and the returned future has been marked + /// completed then adding a task will fail. Status AddTask(const Future<>& task); - Future<> StopAddingAndWait(); + /// A future that will be completed when all running tasks are finished. + /// + /// It is allowed for tasks to be added after this call provided the future has not yet + /// completed. This should be safe as long as the tasks being added are added as part + /// of a task that is tracked. As soon as the count of running tasks reaches 0 this + /// future will be marked complete. + /// + /// Any attempt to add a task after the returned future has completed will fail. + Future<> WaitForTasksToFinish(); private: bool finished_adding_ = false; diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 266e587599b..416cfe2034e 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -86,7 +86,7 @@ TEST(AsyncTaskGroup, Basic) { Future<> fut2 = Future<>::Make(); ASSERT_OK(task_group.AddTask(fut1)); ASSERT_OK(task_group.AddTask(fut2)); - Future<> all_done = task_group.StopAddingAndWait(); + Future<> all_done = task_group.WaitForTasksToFinish(); AssertNotFinished(all_done); fut1.MarkFinished(); AssertNotFinished(all_done); @@ -96,20 +96,36 @@ TEST(AsyncTaskGroup, Basic) { TEST(AsyncTaskGroup, NoTasks) { AsyncTaskGroup task_group; - ASSERT_FINISHES_OK(task_group.StopAddingAndWait()); + ASSERT_FINISHES_OK(task_group.WaitForTasksToFinish()); } TEST(AsyncTaskGroup, AddAfterDone) { AsyncTaskGroup task_group; - ASSERT_FINISHES_OK(task_group.StopAddingAndWait()); + ASSERT_FINISHES_OK(task_group.WaitForTasksToFinish()); ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); } +TEST(AsyncTaskGroup, AddAfterWaitButBeforeFinish) { + AsyncTaskGroup task_group; + Future<> task_one = Future<>::Make(); + ASSERT_OK(task_group.AddTask(task_one)); + Future<> finish_fut = task_group.WaitForTasksToFinish(); + AssertNotFinished(finish_fut); + Future<> task_two = Future<>::Make(); + ASSERT_OK(task_group.AddTask(task_two)); + AssertNotFinished(finish_fut); + task_one.MarkFinished(); + AssertNotFinished(finish_fut); + task_two.MarkFinished(); + AssertFinished(finish_fut); + ASSERT_FINISHES_OK(finish_fut); +} + TEST(AsyncTaskGroup, Error) { AsyncTaskGroup task_group; Future<> failed_task = Future<>::MakeFinished(Status::Invalid("XYZ")); ASSERT_OK(task_group.AddTask(failed_task)); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.StopAddingAndWait()); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.WaitForTasksToFinish()); } TEST(AsyncTaskGroup, TaskFinishesAfterError) { @@ -117,7 +133,7 @@ TEST(AsyncTaskGroup, TaskFinishesAfterError) { Future<> fut1 = Future<>::Make(); ASSERT_OK(task_group.AddTask(fut1)); ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); - Future<> finished_fut = task_group.StopAddingAndWait(); + Future<> finished_fut = task_group.WaitForTasksToFinish(); AssertNotFinished(finished_fut); fut1.MarkFinished(); ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); @@ -127,7 +143,7 @@ TEST(AsyncTaskGroup, AddAfterFailed) { AsyncTaskGroup task_group; ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.StopAddingAndWait()); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.WaitForTasksToFinish()); } } // namespace util From a765bb069da563538568de8007dcec5b6c4ef994 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 3 Sep 2021 16:46:18 -1000 Subject: [PATCH 3/5] Lint --- cpp/src/arrow/util/async_util.h | 2 +- cpp/src/arrow/util/async_util_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 8dc51f8e0e3..293db492013 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -106,7 +106,7 @@ class AsyncTaskGroup { /// Add a task to be tracked by this task group /// /// If a previous task has failed then adding a task will fail - /// + /// /// If WaitForTasksToFinish has been called and the returned future has been marked /// completed then adding a task will fail. Status AddTask(const Future<>& task); diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 416cfe2034e..13d8a69400f 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -147,4 +147,4 @@ TEST(AsyncTaskGroup, AddAfterFailed) { } } // namespace util -} // namespace arrow \ No newline at end of file +} // namespace arrow From 4d0d683776094a5ef3383a8b17c630c3eda39fad Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 3 Sep 2021 16:54:59 -1000 Subject: [PATCH 4/5] Fix typo in #ifndef macro --- cpp/src/arrow/util/async_util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 1e3367db107..76c971f576e 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -30,7 +30,7 @@ AsyncDestroyable::~AsyncDestroyable() { DCHECK(constructed_correctly_) << "An instance of AsyncDestroyable must be created by " "MakeSharedAsync or MakeUniqueAsync"; } -#elif +#else AsyncDestroyable::~AsyncDestroyable() = default; #endif From b6ca920d155bdbe97e79cf60048e6ab93c6c0ea5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 8 Sep 2021 14:23:11 -1000 Subject: [PATCH 5/5] ARROW-13542: Addressing PR comments --- cpp/src/arrow/util/async_util.h | 2 +- cpp/src/arrow/util/async_util_test.cc | 32 +++++++++++++++------------ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 293db492013..31e5d09a86c 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -101,7 +101,7 @@ std::unique_ptr> MakeUniqueAsync(Args&&... args) { /// A task group created at a high level can be distributed amongst low level components /// which register work to be completed. The high level job can then wait for all work /// to be completed before cleaning up. -class AsyncTaskGroup { +class ARROW_EXPORT AsyncTaskGroup { public: /// Add a task to be tracked by this task group /// diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 13d8a69400f..f263ee548cf 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -64,20 +64,9 @@ TEST(AsyncDestroyable, MakeShared) { } TEST(AsyncDestroyable, MakeUnique) { - Future<> gate = Future<>::Make(); - bool destroyed = false; - bool on_closed = false; - { - std::unique_ptr> obj = - MakeUniqueAsync(gate, &destroyed); - obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; }); - ASSERT_FALSE(destroyed); - } - ASSERT_FALSE(destroyed); - ASSERT_FALSE(on_closed); - gate.MarkFinished(); - ASSERT_TRUE(destroyed); - ASSERT_TRUE(on_closed); + TestAsyncDestroyable([](Future<> gate, bool* destroyed) { + return MakeUniqueAsync(gate, destroyed); + }); } TEST(AsyncTaskGroup, Basic) { @@ -146,5 +135,20 @@ TEST(AsyncTaskGroup, AddAfterFailed) { ASSERT_FINISHES_AND_RAISES(Invalid, task_group.WaitForTasksToFinish()); } +TEST(AsyncTaskGroup, FailAfterAdd) { + AsyncTaskGroup task_group; + Future<> will_fail = Future<>::Make(); + ASSERT_OK(task_group.AddTask(will_fail)); + Future<> added_later_and_passes = Future<>::Make(); + ASSERT_OK(task_group.AddTask(added_later_and_passes)); + will_fail.MarkFinished(Status::Invalid("XYZ")); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); + Future<> finished_fut = task_group.WaitForTasksToFinish(); + AssertNotFinished(finished_fut); + added_later_and_passes.MarkFinished(); + AssertFinished(finished_fut); + ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); +} + } // namespace util } // namespace arrow